#include <cstdlib>
#include <memory>
#include "db/db_test_util.h"
#include "port/stack_trace.h"
#include "rocksdb/flush_block_policy.h"
#include "rocksdb/utilities/object_registry.h"
#include "table/block_based/block_builder.h"
#include "table/block_based/data_block_footer.h"
#include "test_util/testutil.h"
#include "util/auto_tune_compressor.h"
#include "util/coding.h"
#include "util/random.h"
#include "util/simple_mixed_compressor.h"
namespace ROCKSDB_NAMESPACE {
class DBCompressionTest : public DBTestBase {
public:
DBCompressionTest() : DBTestBase("compression_test", true) {}
};
TEST_F(DBCompressionTest, PresetCompressionDict) {
const size_t kBlockSizeBytes = 4 << 10;
const size_t kL0FileBytes = 128 << 10;
const size_t kApproxPerBlockOverheadBytes = 50;
const int kNumL0Files = 5;
Options options;
options.env = CurrentOptions().env;
options.allow_concurrent_memtable_write = false;
options.arena_block_size = kBlockSizeBytes;
options.create_if_missing = true;
options.disable_auto_compactions = true;
options.level0_file_num_compaction_trigger = kNumL0Files;
options.memtable_factory.reset(
test::NewSpecialSkipListFactory(kL0FileBytes / kBlockSizeBytes));
options.num_levels = 2;
options.target_file_size_base = kL0FileBytes;
options.target_file_size_multiplier = 2;
options.write_buffer_size = kL0FileBytes;
BlockBasedTableOptions table_options;
table_options.block_size = kBlockSizeBytes;
std::vector<CompressionType> compression_types;
if (Zlib_Supported()) {
compression_types.push_back(kZlibCompression);
}
#if LZ4_VERSION_NUMBER >= 10400
compression_types.push_back(kLZ4Compression);
compression_types.push_back(kLZ4HCCompression);
#endif if (ZSTD_Supported()) {
compression_types.push_back(kZSTD);
}
enum DictionaryTypes : int {
kWithoutDict,
kWithDict,
kWithZSTDfinalizeDict,
kWithZSTDTrainedDict,
kDictEnd,
};
for (auto compression_type : compression_types) {
options.compression = compression_type;
size_t bytes_without_dict = 0;
size_t bytes_with_dict = 0;
size_t bytes_with_zstd_finalize_dict = 0;
size_t bytes_with_zstd_trained_dict = 0;
for (int i = kWithoutDict; i < kDictEnd; i++) {
switch (i) {
case kWithoutDict:
options.compression_opts.max_dict_bytes = 0;
options.compression_opts.zstd_max_train_bytes = 0;
break;
case kWithDict:
options.compression_opts.max_dict_bytes = kBlockSizeBytes;
options.compression_opts.zstd_max_train_bytes = 0;
break;
case kWithZSTDfinalizeDict:
if (compression_type != kZSTD ||
!ZSTD_FinalizeDictionarySupported()) {
continue;
}
options.compression_opts.max_dict_bytes = kBlockSizeBytes;
options.compression_opts.zstd_max_train_bytes = kL0FileBytes;
options.compression_opts.use_zstd_dict_trainer = false;
break;
case kWithZSTDTrainedDict:
if (compression_type != kZSTD || !ZSTD_TrainDictionarySupported()) {
continue;
}
options.compression_opts.max_dict_bytes = kBlockSizeBytes;
options.compression_opts.zstd_max_train_bytes = kL0FileBytes;
options.compression_opts.use_zstd_dict_trainer = true;
break;
default:
assert(false);
}
options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
CreateAndReopenWithCF({"pikachu"}, options);
Random rnd(301);
std::string seq_datas[10];
for (int j = 0; j < 10; ++j) {
seq_datas[j] =
rnd.RandomString(kBlockSizeBytes - kApproxPerBlockOverheadBytes);
}
ASSERT_EQ(0, NumTableFilesAtLevel(0, 1));
for (int j = 0; j < kNumL0Files; ++j) {
for (size_t k = 0; k < kL0FileBytes / kBlockSizeBytes + 1; ++k) {
auto key_num = j * (kL0FileBytes / kBlockSizeBytes) + k;
ASSERT_OK(Put(1, Key(static_cast<int>(key_num)),
seq_datas[(key_num / 10) % 10]));
}
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[1]));
ASSERT_EQ(j + 1, NumTableFilesAtLevel(0, 1));
}
ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1],
true ));
ASSERT_EQ(0, NumTableFilesAtLevel(0, 1));
ASSERT_GT(NumTableFilesAtLevel(1, 1), 0);
size_t total_sst_bytes = TotalSize(1);
if (i == kWithoutDict) {
bytes_without_dict = total_sst_bytes;
} else if (i == kWithDict) {
bytes_with_dict = total_sst_bytes;
} else if (i == kWithZSTDfinalizeDict) {
bytes_with_zstd_finalize_dict = total_sst_bytes;
} else if (i == kWithZSTDTrainedDict) {
bytes_with_zstd_trained_dict = total_sst_bytes;
}
for (size_t j = 0; j < kNumL0Files * (kL0FileBytes / kBlockSizeBytes);
j++) {
ASSERT_EQ(seq_datas[(j / 10) % 10], Get(1, Key(static_cast<int>(j))));
}
if (i == kWithDict) {
ASSERT_GT(bytes_without_dict, bytes_with_dict);
} else if (i == kWithZSTDTrainedDict) {
ASSERT_TRUE(bytes_with_dict > bytes_with_zstd_finalize_dict ||
bytes_without_dict > bytes_with_zstd_finalize_dict);
} else if (i == kWithZSTDTrainedDict) {
ASSERT_TRUE(bytes_with_dict > bytes_with_zstd_trained_dict ||
bytes_without_dict > bytes_with_zstd_trained_dict);
}
DestroyAndReopen(options);
}
}
}
TEST_F(DBCompressionTest, PresetCompressionDictLocality) {
if (!ZSTD_Supported()) {
return;
}
const int kNumEntriesPerFile = 1 << 10; const int kNumBytesPerEntry = 1 << 10; const int kNumFiles = 4;
Options options = CurrentOptions();
options.compression = kZSTD;
options.compression_opts.max_dict_bytes = 1 << 14; options.compression_opts.zstd_max_train_bytes = 1 << 18; options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
options.target_file_size_base = kNumEntriesPerFile * kNumBytesPerEntry;
BlockBasedTableOptions table_options;
table_options.cache_index_and_filter_blocks = true;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
Reopen(options);
Random rnd(301);
for (int i = 0; i < kNumFiles; ++i) {
for (int j = 0; j < kNumEntriesPerFile; ++j) {
ASSERT_OK(Put(Key(i * kNumEntriesPerFile + j),
rnd.RandomString(kNumBytesPerEntry)));
}
ASSERT_OK(Flush());
MoveFilesToLevel(1);
ASSERT_EQ(NumTableFilesAtLevel(1), i + 1);
}
std::vector<std::string> compression_dicts;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"BlockBasedTableBuilder::WriteCompressionDictBlock:RawDict",
[&](void* arg) {
compression_dicts.emplace_back(static_cast<Slice*>(arg)->ToString());
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
CompactRangeOptions compact_range_opts;
compact_range_opts.bottommost_level_compaction =
BottommostLevelCompaction::kForceOptimized;
ASSERT_OK(db_->CompactRange(compact_range_opts, nullptr, nullptr));
ASSERT_GT(NumTableFilesAtLevel(1), 1);
ASSERT_EQ(NumTableFilesAtLevel(1),
static_cast<int>(compression_dicts.size()));
for (size_t i = 1; i < compression_dicts.size(); ++i) {
std::string& a = compression_dicts[i - 1];
std::string& b = compression_dicts[i];
size_t alen = a.size();
size_t blen = b.size();
ASSERT_TRUE(alen != blen || memcmp(a.data(), b.data(), alen) != 0);
}
}
static std::string CompressibleString(Random* rnd, int len) {
std::string r;
test::CompressibleString(rnd, 0.8, len, &r);
return r;
}
TEST_F(DBCompressionTest, DynamicLevelCompressionPerLevel) {
if (!Snappy_Supported()) {
return;
}
const int kNKeys = 120;
int keys[kNKeys];
for (int i = 0; i < kNKeys; i++) {
keys[i] = i;
}
Random rnd(301);
Options options;
options.env = env_;
options.create_if_missing = true;
options.db_write_buffer_size = 20480;
options.write_buffer_size = 20480;
options.max_write_buffer_number = 2;
options.level0_file_num_compaction_trigger = 2;
options.level0_slowdown_writes_trigger = 2;
options.level0_stop_writes_trigger = 2;
options.target_file_size_base = 20480;
options.level_compaction_dynamic_level_bytes = true;
options.max_bytes_for_level_base = 102400;
options.max_bytes_for_level_multiplier = 4;
options.max_background_compactions = 1;
options.num_levels = 5;
options.statistics = CreateDBStatistics();
options.compression_per_level.resize(3);
options.compression_per_level[0] = kNoCompression;
options.compression_per_level[1] = kNoCompression;
options.compression_per_level[2] = kSnappyCompression;
OnFileDeletionListener* listener = new OnFileDeletionListener();
options.listeners.emplace_back(listener);
DestroyAndReopen(options);
for (int i = 0; i < 20; i++) {
ASSERT_OK(Put(Key(keys[i]), CompressibleString(&rnd, 4000)));
ASSERT_OK(dbfull()->TEST_WaitForBackgroundWork());
}
ASSERT_OK(Flush());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ(NumTableFilesAtLevel(1), 0);
ASSERT_EQ(NumTableFilesAtLevel(2), 0);
ASSERT_EQ(NumTableFilesAtLevel(3), 0);
ASSERT_TRUE(NumTableFilesAtLevel(0) > 0 || NumTableFilesAtLevel(4) > 0);
auto num_block_compressed =
options.statistics->getTickerCount(NUMBER_BLOCK_COMPRESSED);
ASSERT_EQ(num_block_compressed, 0);
ASSERT_OK(options.statistics->Reset());
ASSERT_EQ(num_block_compressed, 0);
for (int i = 20; i < 120; i++) {
ASSERT_OK(Put(Key(keys[i]), CompressibleString(&rnd, 4000)));
ASSERT_OK(dbfull()->TEST_WaitForBackgroundWork());
}
ASSERT_OK(Flush());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ(NumTableFilesAtLevel(1), 0);
ASSERT_EQ(NumTableFilesAtLevel(2), 0);
ASSERT_GE(NumTableFilesAtLevel(3), 1);
ASSERT_GE(NumTableFilesAtLevel(4), 1);
num_block_compressed =
options.statistics->getTickerCount(NUMBER_BLOCK_COMPRESSED);
ASSERT_GT(num_block_compressed, 0);
ASSERT_OK(dbfull()->SetOptions({
{"disable_auto_compactions", "true"},
}));
ColumnFamilyMetaData cf_meta;
db_->GetColumnFamilyMetaData(&cf_meta);
int largestkey_in_prev_level = -1;
int keys_found = 0;
for (int level = (int)cf_meta.levels.size() - 1; level >= 0; level--) {
int files_in_level = (int)cf_meta.levels[level].files.size();
int largestkey_in_prev_file = -1;
for (int j = 0; j < files_in_level; j++) {
int smallestkey = IdFromKey(cf_meta.levels[level].files[j].smallestkey);
int largestkey = IdFromKey(cf_meta.levels[level].files[j].largestkey);
int num_entries = (int)cf_meta.levels[level].files[j].num_entries;
ASSERT_EQ(num_entries, largestkey - smallestkey + 1);
keys_found += num_entries;
if (level > 0) {
if (j == 0) {
ASSERT_GT(smallestkey, largestkey_in_prev_level);
}
if (j > 0) {
ASSERT_GT(smallestkey, largestkey_in_prev_file);
}
if (j == files_in_level - 1) {
largestkey_in_prev_level = largestkey;
}
}
largestkey_in_prev_file = largestkey;
}
}
ASSERT_EQ(keys_found, kNKeys);
for (const auto& file : cf_meta.levels[4].files) {
listener->SetExpectedFileName(dbname_ + file.name);
const RangeOpt ranges(file.smallestkey, file.largestkey);
EXPECT_OK(dbfull()->DeleteFilesInRanges(dbfull()->DefaultColumnFamily(),
&ranges, true ));
}
listener->VerifyMatchedCount(cf_meta.levels[4].files.size());
int num_keys = 0;
std::unique_ptr<Iterator> iter(db_->NewIterator(ReadOptions()));
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
num_keys++;
}
ASSERT_OK(iter->status());
ASSERT_EQ(NumTableFilesAtLevel(1), 0);
ASSERT_EQ(NumTableFilesAtLevel(2), 0);
ASSERT_GE(NumTableFilesAtLevel(3), 1);
ASSERT_EQ(NumTableFilesAtLevel(4), 0);
ASSERT_GT(SizeAtLevel(0) + SizeAtLevel(3), num_keys * 4000U + num_keys * 10U);
}
TEST_F(DBCompressionTest, DynamicLevelCompressionPerLevel2) {
if (!Snappy_Supported() || !LZ4_Supported() || !Zlib_Supported()) {
return;
}
const int kNKeys = 500;
int keys[kNKeys];
for (int i = 0; i < kNKeys; i++) {
keys[i] = i;
}
RandomShuffle(std::begin(keys), std::end(keys));
Random rnd(301);
Options options;
options.create_if_missing = true;
options.db_write_buffer_size = 6000000;
options.write_buffer_size = 600000;
options.max_write_buffer_number = 2;
options.level0_file_num_compaction_trigger = 2;
options.level0_slowdown_writes_trigger = 2;
options.level0_stop_writes_trigger = 2;
options.soft_pending_compaction_bytes_limit = 1024 * 1024;
options.target_file_size_base = 20;
options.env = env_;
options.level_compaction_dynamic_level_bytes = true;
options.max_bytes_for_level_base = 200;
options.max_bytes_for_level_multiplier = 8;
options.max_background_compactions = 1;
options.num_levels = 5;
std::shared_ptr<mock::MockTableFactory> mtf(new mock::MockTableFactory);
options.table_factory = mtf;
options.compression_per_level.resize(3);
options.compression_per_level[0] = kNoCompression;
options.compression_per_level[1] = kLZ4Compression;
options.compression_per_level[2] = kZlibCompression;
DestroyAndReopen(options);
std::atomic<int> num_zlib(0);
std::atomic<int> num_lz4(0);
std::atomic<int> num_no(0);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
Compaction* compaction = static_cast<Compaction*>(arg);
if (compaction->output_level() == 4) {
ASSERT_TRUE(compaction->output_compression() == kLZ4Compression);
num_lz4.fetch_add(1);
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"FlushJob::WriteLevel0Table:output_compression", [&](void* arg) {
auto* compression = static_cast<CompressionType*>(arg);
ASSERT_TRUE(*compression == kNoCompression);
num_no.fetch_add(1);
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
for (int i = 0; i < 100; i++) {
std::string value = rnd.RandomString(200);
ASSERT_OK(Put(Key(keys[i]), value));
if (i % 25 == 24) {
ASSERT_OK(Flush());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
}
}
ASSERT_OK(Flush());
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
ASSERT_EQ(NumTableFilesAtLevel(1), 0);
ASSERT_EQ(NumTableFilesAtLevel(2), 0);
ASSERT_EQ(NumTableFilesAtLevel(3), 0);
ASSERT_GT(NumTableFilesAtLevel(4), 0);
ASSERT_GT(num_no.load(), 2);
ASSERT_GT(num_lz4.load(), 0);
int prev_num_files_l4 = NumTableFilesAtLevel(4);
num_lz4.store(0);
num_no.store(0);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
Compaction* compaction = static_cast<Compaction*>(arg);
if (compaction->output_level() == 4 && compaction->start_level() == 3) {
ASSERT_TRUE(compaction->output_compression() == kZlibCompression);
num_zlib.fetch_add(1);
} else {
ASSERT_TRUE(compaction->output_compression() == kLZ4Compression);
num_lz4.fetch_add(1);
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"FlushJob::WriteLevel0Table:output_compression", [&](void* arg) {
auto* compression = static_cast<CompressionType*>(arg);
ASSERT_TRUE(*compression == kNoCompression);
num_no.fetch_add(1);
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
for (int i = 101; i < 500; i++) {
std::string value = rnd.RandomString(200);
ASSERT_OK(Put(Key(keys[i]), value));
if (i % 100 == 99) {
ASSERT_OK(Flush());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
}
}
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ASSERT_EQ(NumTableFilesAtLevel(1), 0);
ASSERT_EQ(NumTableFilesAtLevel(2), 0);
ASSERT_GT(NumTableFilesAtLevel(3), 0);
ASSERT_GT(NumTableFilesAtLevel(4), prev_num_files_l4);
ASSERT_GT(num_no.load(), 2);
ASSERT_GT(num_lz4.load(), 0);
ASSERT_GT(num_zlib.load(), 0);
}
class PresetCompressionDictTest
: public DBTestBase,
public testing::WithParamInterface<std::tuple<CompressionType, bool>> {
public:
PresetCompressionDictTest()
: DBTestBase("db_test2", false ),
compression_type_(std::get<0>(GetParam())),
bottommost_(std::get<1>(GetParam())) {}
protected:
const CompressionType compression_type_;
const bool bottommost_;
};
INSTANTIATE_TEST_CASE_P(
DBCompressionTest, PresetCompressionDictTest,
::testing::Combine(::testing::ValuesIn(GetSupportedDictCompressions()),
::testing::Bool()));
TEST_P(PresetCompressionDictTest, Flush) {
const size_t kValueLen = 256;
const size_t kKeysPerFile = 1 << 10;
const size_t kDictLen = 16 << 10;
const size_t kBlockLen = 4 << 10;
Options options = CurrentOptions();
if (bottommost_) {
options.bottommost_compression = compression_type_;
options.bottommost_compression_opts.enabled = true;
options.bottommost_compression_opts.max_dict_bytes = kDictLen;
options.bottommost_compression_opts.max_dict_buffer_bytes = kBlockLen;
} else {
options.compression = compression_type_;
options.compression_opts.max_dict_bytes = kDictLen;
options.compression_opts.max_dict_buffer_bytes = kBlockLen;
}
options.memtable_factory.reset(test::NewSpecialSkipListFactory(kKeysPerFile));
options.statistics = CreateDBStatistics();
BlockBasedTableOptions bbto;
bbto.block_size = kBlockLen;
bbto.cache_index_and_filter_blocks = true;
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
Reopen(options);
Random rnd(301);
for (size_t i = 0; i <= kKeysPerFile; ++i) {
ASSERT_OK(Put(Key(static_cast<int>(i)), rnd.RandomString(kValueLen)));
}
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
if (bottommost_) {
ASSERT_EQ(
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT),
0);
} else {
ASSERT_GT(
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT),
0);
ASSERT_EQ(TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_ADD), 1);
ASSERT_LE(
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT),
(compression_type_ == kZSTD ? 10 : 2) * kBlockLen);
}
}
TEST_P(PresetCompressionDictTest, CompactNonBottommost) {
const size_t kValueLen = 256;
const size_t kKeysPerFile = 1 << 10;
const size_t kDictLen = 16 << 10;
const size_t kBlockLen = 4 << 10;
Options options = CurrentOptions();
if (bottommost_) {
options.bottommost_compression = compression_type_;
options.bottommost_compression_opts.enabled = true;
options.bottommost_compression_opts.max_dict_bytes = kDictLen;
options.bottommost_compression_opts.max_dict_buffer_bytes = kBlockLen;
} else {
options.compression = compression_type_;
options.compression_opts.max_dict_bytes = kDictLen;
options.compression_opts.max_dict_buffer_bytes = kBlockLen;
}
options.disable_auto_compactions = true;
options.statistics = CreateDBStatistics();
BlockBasedTableOptions bbto;
bbto.block_size = kBlockLen;
bbto.cache_index_and_filter_blocks = true;
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
Reopen(options);
Random rnd(301);
for (size_t j = 0; j <= kKeysPerFile; ++j) {
ASSERT_OK(Put(Key(static_cast<int>(j)), rnd.RandomString(kValueLen)));
}
ASSERT_OK(Flush());
MoveFilesToLevel(2);
for (int i = 0; i < 2; ++i) {
for (size_t j = 0; j <= kKeysPerFile; ++j) {
ASSERT_OK(Put(Key(static_cast<int>(j)), rnd.RandomString(kValueLen)));
}
ASSERT_OK(Flush());
}
ASSERT_EQ("2,0,1", FilesPerLevel(0));
PopTicker(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT);
PopTicker(options, BLOCK_CACHE_COMPRESSION_DICT_ADD);
ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr));
ASSERT_EQ("0,1,1", FilesPerLevel(0));
if (bottommost_) {
ASSERT_EQ(
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT),
0);
} else {
ASSERT_GT(
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT),
0);
ASSERT_EQ(TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_ADD), 1);
ASSERT_LE(
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT),
(compression_type_ == kZSTD ? 10 : 2) * kBlockLen);
}
}
TEST_P(PresetCompressionDictTest, CompactBottommost) {
const size_t kValueLen = 256;
const size_t kKeysPerFile = 1 << 10;
const size_t kDictLen = 16 << 10;
const size_t kBlockLen = 4 << 10;
Options options = CurrentOptions();
if (bottommost_) {
options.bottommost_compression = compression_type_;
options.bottommost_compression_opts.enabled = true;
options.bottommost_compression_opts.max_dict_bytes = kDictLen;
options.bottommost_compression_opts.max_dict_buffer_bytes = kBlockLen;
} else {
options.compression = compression_type_;
options.compression_opts.max_dict_bytes = kDictLen;
options.compression_opts.max_dict_buffer_bytes = kBlockLen;
}
options.disable_auto_compactions = true;
options.statistics = CreateDBStatistics();
BlockBasedTableOptions bbto;
bbto.block_size = kBlockLen;
bbto.cache_index_and_filter_blocks = true;
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
Reopen(options);
Random rnd(301);
for (int i = 0; i < 2; ++i) {
for (size_t j = 0; j <= kKeysPerFile; ++j) {
ASSERT_OK(Put(Key(static_cast<int>(j)), rnd.RandomString(kValueLen)));
}
ASSERT_OK(Flush());
}
ASSERT_EQ("2", FilesPerLevel(0));
PopTicker(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT);
PopTicker(options, BLOCK_CACHE_COMPRESSION_DICT_ADD);
CompactRangeOptions cro;
ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
ASSERT_EQ("0,1", FilesPerLevel(0));
ASSERT_GT(
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT),
0);
ASSERT_EQ(TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_ADD), 1);
ASSERT_LE(
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT),
(compression_type_ == kZSTD ? 10 : 2) * kBlockLen);
}
class CompactionCompressionListener : public EventListener {
public:
explicit CompactionCompressionListener(Options* db_options)
: db_options_(db_options) {}
void OnCompactionCompleted(DB* db, const CompactionJobInfo& ci) override {
int bottommost_level = 0;
for (int level = 0; level < db->NumberLevels(); level++) {
std::string files_at_level;
ASSERT_TRUE(
db->GetProperty("rocksdb.num-files-at-level" + std::to_string(level),
&files_at_level));
if (files_at_level != "0") {
bottommost_level = level;
}
}
if (db_options_->bottommost_compression != kDisableCompressionOption &&
ci.output_level == bottommost_level) {
ASSERT_EQ(ci.compression, db_options_->bottommost_compression);
} else if (db_options_->compression_per_level.size() != 0) {
ASSERT_EQ(ci.compression,
db_options_->compression_per_level[ci.output_level]);
} else {
ASSERT_EQ(ci.compression, db_options_->compression);
}
max_level_checked = std::max(max_level_checked, ci.output_level);
}
int max_level_checked = 0;
const Options* db_options_;
};
enum CompressionFailureType {
kTestCompressionFail,
kTestDecompressionFail,
kTestDecompressionCorruption,
kTestStartOfFinishFail,
};
class CompressionFailuresTest
: public DBCompressionTest,
public testing::WithParamInterface<std::tuple<
CompressionFailureType, CompressionType, uint32_t, uint32_t>> {
public:
CompressionFailuresTest() {
std::tie(compression_failure_type_, compression_type_,
compression_max_dict_bytes_, compression_parallel_threads_) =
GetParam();
}
CompressionFailureType compression_failure_type_ = kTestCompressionFail;
CompressionType compression_type_ = kNoCompression;
uint32_t compression_max_dict_bytes_ = 0;
uint32_t compression_parallel_threads_ = 0;
};
INSTANTIATE_TEST_CASE_P(
DBCompressionTest, CompressionFailuresTest,
::testing::Combine(::testing::Values(kTestCompressionFail,
kTestDecompressionFail,
kTestDecompressionCorruption,
kTestStartOfFinishFail),
::testing::ValuesIn(GetSupportedCompressions()),
::testing::Values(0, 10), ::testing::Values(1, 4)));
TEST_P(CompressionFailuresTest, CompressionFailures) {
if (compression_type_ == kNoCompression) {
return;
}
Options options = CurrentOptions();
options.level0_file_num_compaction_trigger = 2;
options.max_bytes_for_level_base = 1024;
options.max_bytes_for_level_multiplier = 2;
options.num_levels = 7;
options.max_background_compactions = 1;
options.target_file_size_base = 512;
BlockBasedTableOptions table_options;
table_options.block_size = 512;
table_options.verify_compression = true;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
options.compression = compression_type_;
options.compression_opts.parallel_threads = compression_parallel_threads_;
options.compression_opts.max_dict_bytes = compression_max_dict_bytes_;
options.bottommost_compression_opts.parallel_threads =
compression_parallel_threads_;
options.bottommost_compression_opts.max_dict_bytes =
compression_max_dict_bytes_;
if (compression_failure_type_ == kTestCompressionFail) {
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"BlockBasedTableBuilder::CompressAndVerifyBlock:TamperWithResultType",
[](void* arg) {
CompressionType* ret = static_cast<CompressionType*>(arg);
*ret = kNoCompression;
});
} else if (compression_failure_type_ == kTestDecompressionFail) {
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DecompressBlockData:TamperWithReturnValue", [](void* arg) {
Status* ret = static_cast<Status*>(arg);
ASSERT_OK(*ret);
*ret = Status::Corruption("kTestDecompressionFail");
});
} else if (compression_failure_type_ == kTestDecompressionCorruption) {
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DecompressBlockData:TamperWithDecompressionOutput", [](void* arg) {
BlockContents* contents = static_cast<BlockContents*>(arg);
const size_t len = contents->data.size() + 1;
std::unique_ptr<char[]> fake_data(new char[len]());
*contents = BlockContents(std::move(fake_data), len);
});
} else if (compression_failure_type_ == kTestStartOfFinishFail) {
if (compression_parallel_threads_ <= 1) {
return;
}
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"BlockBasedTableBuilder::Finish:ParallelIOStatus", [&](void* arg) {
*static_cast<IOStatus*>(arg) = IOStatus::Corruption("Seeded failure");
});
} else {
abort();
}
std::map<std::string, std::string> key_value_written;
const int kKeySize = 5;
const int kValUnitSize = 16;
const int kValSize = 256;
Random rnd(405);
Status s = Status::OK();
DestroyAndReopen(options);
for (int i = 0; i < 10; i++) {
for (int j = 0; j < 5; j++) {
std::string key = rnd.RandomString(kKeySize);
std::string valueUnit = rnd.RandomString(kValUnitSize);
std::string value;
for (int k = 0; k < kValSize; k += kValUnitSize) {
value += valueUnit;
}
s = Put(key, value);
if (compression_failure_type_ == kTestCompressionFail) {
key_value_written[key] = value;
ASSERT_OK(s);
}
}
s = Flush();
if (compression_failure_type_ == kTestCompressionFail) {
ASSERT_OK(s);
}
s = dbfull()->TEST_WaitForCompact();
if (compression_failure_type_ == kTestCompressionFail) {
ASSERT_OK(s);
}
if (i == 4) {
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
}
}
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
auto st = s.getState();
if (compression_failure_type_ == kTestCompressionFail) {
std::unique_ptr<Iterator> db_iter(db_->NewIterator(ReadOptions()));
for (db_iter->SeekToFirst(); db_iter->Valid(); db_iter->Next()) {
std::string key = db_iter->key().ToString();
std::string value = db_iter->value().ToString();
ASSERT_NE(key_value_written.find(key), key_value_written.end());
ASSERT_EQ(key_value_written[key], value);
key_value_written.erase(key);
}
ASSERT_OK(db_iter->status());
ASSERT_EQ(0, key_value_written.size());
} else if (compression_failure_type_ == kTestDecompressionFail) {
ASSERT_EQ(s.code(), Status::kCorruption);
ASSERT_NE(st, nullptr);
ASSERT_EQ(std::string(st), "Could not decompress: kTestDecompressionFail");
} else if (compression_failure_type_ == kTestDecompressionCorruption) {
ASSERT_EQ(s.code(), Status::kCorruption);
ASSERT_NE(st, nullptr);
ASSERT_EQ(std::string(st),
"Decompressed block did not match pre-compression block");
} else if (compression_failure_type_ == kTestStartOfFinishFail) {
ASSERT_EQ(s.code(), Status::kCorruption);
ASSERT_NE(st, nullptr);
ASSERT_EQ(std::string(st), "Seeded failure");
}
}
TEST_F(DBCompressionTest, CompressionOptions) {
if (!Zlib_Supported() || !Snappy_Supported()) {
return;
}
Options options = CurrentOptions();
options.level0_file_num_compaction_trigger = 2;
options.max_bytes_for_level_base = 100;
options.max_bytes_for_level_multiplier = 2;
options.num_levels = 7;
options.max_background_compactions = 1;
CompactionCompressionListener* listener =
new CompactionCompressionListener(&options);
options.listeners.emplace_back(listener);
const int kKeySize = 5;
const int kValSize = 20;
Random rnd(301);
std::vector<uint32_t> compression_parallel_threads = {1, 4};
std::map<std::string, std::string> key_value_written;
for (int iter = 0; iter <= 2; iter++) {
listener->max_level_checked = 0;
if (iter == 0) {
options.compression_per_level = {kNoCompression, kNoCompression,
kNoCompression, kSnappyCompression,
kSnappyCompression, kSnappyCompression,
kZlibCompression};
options.compression = kNoCompression;
options.bottommost_compression = kZlibCompression;
} else if (iter == 1) {
options.compression_per_level = {};
options.compression = kSnappyCompression;
options.bottommost_compression = kZlibCompression;
} else if (iter == 2) {
options.compression_per_level = {};
options.compression = kSnappyCompression;
options.bottommost_compression = kDisableCompressionOption;
}
for (auto num_threads : compression_parallel_threads) {
options.compression_opts.parallel_threads = num_threads;
options.bottommost_compression_opts.parallel_threads = num_threads;
DestroyAndReopen(options);
for (int i = 0; i < 10; i++) {
for (int j = 0; j < 5; j++) {
std::string key = rnd.RandomString(kKeySize);
std::string value = rnd.RandomString(kValSize);
key_value_written[key] = value;
ASSERT_OK(Put(key, value));
}
ASSERT_OK(Flush());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
}
ASSERT_EQ(listener->max_level_checked, 6);
std::unique_ptr<Iterator> db_iter(db_->NewIterator(ReadOptions()));
for (db_iter->SeekToFirst(); db_iter->Valid(); db_iter->Next()) {
std::string key = db_iter->key().ToString();
std::string value = db_iter->value().ToString();
ASSERT_NE(key_value_written.find(key), key_value_written.end());
ASSERT_EQ(key_value_written[key], value);
key_value_written.erase(key);
}
ASSERT_OK(db_iter->status());
ASSERT_EQ(0, key_value_written.size());
}
}
}
TEST_F(DBCompressionTest, RoundRobinManager) {
if (ZSTD_Supported()) {
auto mgr =
std::make_shared<RoundRobinManager>(GetBuiltinV2CompressionManager());
std::vector<std::string> values;
for (bool use_wrapper : {true}) {
SCOPED_TRACE((use_wrapper ? "With " : "No ") + std::string("wrapper"));
Options options = CurrentOptions();
options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
options.statistics->set_stats_level(StatsLevel::kExceptTimeForMutex);
BlockBasedTableOptions bbto;
bbto.enable_index_compression = false;
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
options.compression_manager = use_wrapper ? mgr : nullptr;
DestroyAndReopen(options);
Random rnd(301);
constexpr int kCount = 13;
for (int i = 0; i < kCount; ++i) {
std::string value;
if (i == 6) {
value = rnd.RandomBinaryString(20000);
} else {
test::CompressibleString(&rnd, 0.1, 20000, &value);
}
values.push_back(value);
ASSERT_OK(Put(Key(i), value));
ASSERT_EQ(Get(Key(i)), value);
}
ASSERT_OK(Flush());
for (int i = 0; i < kCount; ++i) {
ASSERT_NE(Get(Key(i)), "NOT_FOUND");
ASSERT_EQ(Get(Key(i)), values[i]);
}
ASSERT_EQ(Get(Key(kCount)), "NOT_FOUND");
}
}
}
TEST_F(DBCompressionTest, RandomMixedCompressionManager) {
if (ZSTD_Supported()) {
auto mgr = std::make_shared<RandomMixedCompressionManager>(
GetBuiltinV2CompressionManager());
std::vector<std::string> values;
for (bool use_wrapper : {true}) {
SCOPED_TRACE((use_wrapper ? "With " : "No ") + std::string("wrapper"));
Options options = CurrentOptions();
options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
options.statistics->set_stats_level(StatsLevel::kExceptTimeForMutex);
BlockBasedTableOptions bbto;
bbto.enable_index_compression = false;
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
options.compression_manager = use_wrapper ? mgr : nullptr;
DestroyAndReopen(options);
Random rnd(301);
constexpr int kCount = 13;
for (int i = 0; i < kCount; ++i) {
std::string value;
if (i == 6) {
value = rnd.RandomBinaryString(20000);
} else {
test::CompressibleString(&rnd, 0.1, 20000, &value);
}
values.push_back(value);
ASSERT_OK(Put(Key(i), value));
ASSERT_EQ(Get(Key(i)), value);
}
ASSERT_OK(Flush());
for (int i = 0; i < kCount; ++i) {
ASSERT_NE(Get(Key(i)), "NOT_FOUND");
ASSERT_EQ(Get(Key(i)), values[i]);
}
ASSERT_EQ(Get(Key(kCount)), "NOT_FOUND");
}
}
}
namespace {
template <bool kIndexBlockV4>
static Status ValidateRocksBlock(Slice data, bool use_separated_kv,
uint64_t restart_interval) {
assert(!kIndexBlockV4 ||
!use_separated_kv); const char* src = data.data();
size_t srcSize = data.size();
const char* const block_type_str =
kIndexBlockV4 ? "Index block" : "Data block";
Slice input(src, srcSize);
DataBlockFooter footer;
Status s = footer.DecodeFrom(&input);
if (!s.ok()) {
return Status::Corruption(std::string(block_type_str) + " too small");
}
uint32_t numRestarts = footer.num_restarts;
if (numRestarts > srcSize / 4 || numRestarts == 0) {
return Status::Corruption(std::string("Invalid num_restarts in ") +
block_type_str);
}
size_t restartsSize = numRestarts * sizeof(uint32_t);
if (input.size() < restartsSize) {
return Status::Corruption(std::string(block_type_str) +
" too small for restarts array");
}
size_t entriesSize;
uint32_t values_section_offset = 0;
if (footer.separated_kv) {
values_section_offset = footer.values_section_offset;
entriesSize = values_section_offset; } else {
entriesSize = input.size() - restartsSize;
}
const char* entriesEnd = src + entriesSize;
const char* p = src;
uint32_t cur_idx = 0;
Slice current_value;
while (p < entriesEnd) {
uint32_t shared;
const char* next = GetVarint32Ptr(p, entriesEnd, &shared);
if (next == nullptr) {
return Status::Corruption(std::string("Invalid shared_bytes varint in ") +
block_type_str);
}
p = next;
uint32_t unshared;
next = GetVarint32Ptr(p, entriesEnd, &unshared);
if (next == nullptr) {
return Status::Corruption(
std::string("Invalid unshared_bytes varint in ") + block_type_str);
}
p = next;
uint32_t valueLen = 0;
if constexpr (!kIndexBlockV4) {
next = GetVarint32Ptr(p, entriesEnd, &valueLen);
if (next == nullptr) {
return Status::Corruption(
std::string("Invalid value_length varint in ") + block_type_str);
}
p = next;
}
uint32_t value_offset = 0;
if (cur_idx % restart_interval == 0 && use_separated_kv) {
next = GetVarint32Ptr(p, entriesEnd, &value_offset);
if (next == nullptr) {
return Status::Corruption(
std::string("Invalid value_offset varint in ") + block_type_str);
}
p = next;
}
if (p + unshared > entriesEnd) {
return Status::Corruption(
std::string("Key delta exceeds end of entries in ") + block_type_str);
}
p += unshared;
if constexpr (kIndexBlockV4) {
uint32_t v1;
next = GetVarint32Ptr(p, entriesEnd, &v1);
if (next == nullptr) {
return Status::Corruption(std::string("Invalid value varint in ") +
block_type_str);
}
p = next;
if (shared == 0) {
uint32_t v2;
next = GetVarint32Ptr(p, entriesEnd, &v2);
if (next == nullptr) {
return Status::Corruption(
std::string("Invalid second value varint in ") + block_type_str);
}
p = next;
}
} else {
if (!use_separated_kv) {
if (p + valueLen > entriesEnd) {
return Status::Corruption(
std::string("Value exceeds end of entries in ") + block_type_str);
}
p += valueLen;
} else {
if (cur_idx % restart_interval == 0) {
current_value =
Slice(src + values_section_offset + value_offset, valueLen);
} else {
current_value =
Slice(current_value.data() + current_value.size(), valueLen);
}
if (current_value.data() + current_value.size() >
src + srcSize - restartsSize) {
return Status::Corruption(
std::string("Value exceeds values section in ") + block_type_str);
}
}
}
++cur_idx;
}
return Status::OK();
}
}
class DBCompressionTestMaybeParallel
: public DBCompressionTest,
public testing::WithParamInterface<std::tuple<int, bool, bool>> {
public:
DBCompressionTestMaybeParallel()
: DBCompressionTest(),
parallel_threads_(std::get<0>(GetParam())),
use_dict_(std::get<1>(GetParam())),
separate_kv_(std::get<2>(GetParam())) {}
protected:
int parallel_threads_;
bool use_dict_;
bool separate_kv_;
};
INSTANTIATE_TEST_CASE_P(DBCompressionTest, DBCompressionTestMaybeParallel,
::testing::Combine(::testing::Values(1, 4),
::testing::Values(false, true),
::testing::Values(false, true)));
TEST_P(DBCompressionTestMaybeParallel, CompressionManagerWrapper) {
static std::string kDoNotCompress = "do_not_compress";
static std::string kRejectCompression = "reject_compression";
static RelaxedAtomic<int> dataCheckedCount{0};
static RelaxedAtomic<int> indexCheckedCount{0};
static RelaxedAtomic<int> compressCalledCount{0};
static bool useSeparatedKV = false;
useSeparatedKV = separate_kv_;
struct CheckDataBlockCompressorWrapper : public CompressorWrapper {
using CompressorWrapper::CompressorWrapper;
const char* Name() const override { return "CheckDataBlockCompressor"; }
std::unique_ptr<Compressor> Clone() const override {
return std::make_unique<CheckDataBlockCompressorWrapper>(
wrapped_->Clone());
}
Status CompressBlock(Slice uncompressed_data, char* compressed_output,
size_t* compressed_output_size,
CompressionType* out_compression_type,
ManagedWorkingArea* working_area) override {
dataCheckedCount.FetchAddRelaxed(1);
Status s = ValidateRocksBlock<false>(
uncompressed_data, useSeparatedKV,
BlockBasedTableOptions().block_restart_interval);
if (!s.ok()) {
return s;
}
return wrapped_->CompressBlock(uncompressed_data, compressed_output,
compressed_output_size,
out_compression_type, working_area);
}
};
struct CheckIndexBlockCompressorWrapper : public CompressorWrapper {
using CompressorWrapper::CompressorWrapper;
const char* Name() const override { return "CheckIndexBlockCompressor"; }
std::unique_ptr<Compressor> Clone() const override {
return std::make_unique<CheckIndexBlockCompressorWrapper>(
wrapped_->Clone());
}
Status CompressBlock(Slice uncompressed_data, char* compressed_output,
size_t* compressed_output_size,
CompressionType* out_compression_type,
ManagedWorkingArea* working_area) override {
indexCheckedCount.FetchAddRelaxed(1);
Status s = ValidateRocksBlock<true>(
uncompressed_data, false,
BlockBasedTableOptions().index_block_restart_interval);
if (!s.ok()) {
return s;
}
return wrapped_->CompressBlock(uncompressed_data, compressed_output,
compressed_output_size,
out_compression_type, working_area);
}
};
struct MyCompressor : public CompressorWrapper {
using CompressorWrapper::CompressorWrapper;
const char* Name() const override { return "MyCompressor"; }
std::unique_ptr<Compressor> Clone() const override {
return std::make_unique<MyCompressor>(wrapped_->Clone());
}
Status CompressBlock(Slice uncompressed_data, char* compressed_output,
size_t* compressed_output_size,
CompressionType* out_compression_type,
ManagedWorkingArea* working_area) override {
compressCalledCount.FetchAddRelaxed(1);
auto begin = uncompressed_data.data();
auto end = uncompressed_data.data() + uncompressed_data.size();
if (std::search(begin, end, kDoNotCompress.begin(),
kDoNotCompress.end()) != end) {
*compressed_output_size = 0;
EXPECT_EQ(*out_compression_type, kNoCompression);
return Status::OK();
} else if (std::search(begin, end, kRejectCompression.begin(),
kRejectCompression.end()) != end) {
*compressed_output_size = 1;
EXPECT_EQ(*out_compression_type, kNoCompression);
return Status::OK();
} else {
return wrapped_->CompressBlock(uncompressed_data, compressed_output,
compressed_output_size,
out_compression_type, working_area);
}
}
struct MyWorkingArea : public WorkingArea {
explicit MyWorkingArea(ManagedWorkingArea&& wrapped)
: wrapped_(std::move(wrapped)) {}
ManagedWorkingArea wrapped_;
};
ManagedWorkingArea ObtainWorkingArea() override {
ManagedWorkingArea rv{
new MyWorkingArea{CompressorWrapper::ObtainWorkingArea()}, this};
if (GetPreferredCompressionType() == kZSTD) {
assert(rv.get() != nullptr);
}
return rv;
}
void ReleaseWorkingArea(WorkingArea* wa) override {
delete static_cast<MyWorkingArea*>(wa);
}
std::unique_ptr<Compressor> MaybeCloneSpecialized(
CacheEntryRole block_type,
DictConfigArgs&& dict_config) const override {
std::unique_ptr<Compressor> result = std::make_unique<MyCompressor>(
wrapped_->CloneMaybeSpecialized(block_type, std::move(dict_config)));
if (block_type == CacheEntryRole::kDataBlock) {
result = std::make_unique<CheckDataBlockCompressorWrapper>(
std::move(result));
} else if (block_type == CacheEntryRole::kIndexBlock) {
result = std::make_unique<CheckIndexBlockCompressorWrapper>(
std::move(result));
}
return result;
}
};
struct MyManager : public CompressionManagerWrapper {
using CompressionManagerWrapper::CompressionManagerWrapper;
const char* Name() const override { return "MyManager"; }
std::unique_ptr<Compressor> GetCompressorForSST(
const FilterBuildingContext& context, const CompressionOptions& opts,
CompressionType preferred) override {
return std::make_unique<MyCompressor>(
wrapped_->GetCompressorForSST(context, opts, preferred));
}
};
auto mgr = std::make_shared<MyManager>(GetBuiltinV2CompressionManager());
for (CompressionType type : GetSupportedCompressions()) {
for (bool use_wrapper : {false, true}) {
if (type == kNoCompression) {
continue;
}
SCOPED_TRACE("Compression type: " + std::to_string(type) +
(use_wrapper ? " with " : " no ") + "wrapper" +
(separate_kv_ ? " separated_kv" : ""));
Options options = CurrentOptions();
options.compression = type;
options.compression_opts.parallel_threads = parallel_threads_;
options.compression_opts.max_dict_bytes = use_dict_ ? 4096 : 0;
options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
options.statistics->set_stats_level(StatsLevel::kExceptTimeForMutex);
BlockBasedTableOptions bbto;
bbto.enable_index_compression = true;
bbto.index_type = BlockBasedTableOptions::kTwoLevelIndexSearch;
bbto.partition_filters = true;
bbto.filter_policy.reset(NewBloomFilterPolicy(5));
bbto.separate_key_value_in_data_block = separate_kv_;
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
options.compression_manager = use_wrapper ? mgr : nullptr;
DestroyAndReopen(options);
auto PopStat = [&](Tickers t) -> uint64_t {
return options.statistics->getAndResetTickerCount(t);
};
Random rnd(301);
constexpr int kCount = 13;
for (int i = 0; i < kCount; ++i) {
std::string value;
if (i == 6) {
value = rnd.RandomBinaryString(20000);
} else {
test::CompressibleString(&rnd, 0.1, 20000, &value);
if ((i % 2) == 0) {
value += kDoNotCompress;
} else if (i == 7) {
value += kRejectCompression;
}
}
ASSERT_OK(Put(Key(i), value));
}
ASSERT_OK(Flush());
constexpr int kIdxComp = 1;
constexpr int kIdxRej = 1;
if (use_dict_) {
} else if (use_wrapper) {
EXPECT_EQ(kCount / 2 - 1 + kIdxComp, PopStat(NUMBER_BLOCK_COMPRESSED));
EXPECT_EQ(kCount / 2, PopStat(NUMBER_BLOCK_COMPRESSION_BYPASSED));
EXPECT_EQ(1 + 1 + kIdxRej, PopStat(NUMBER_BLOCK_COMPRESSION_REJECTED));
} else {
EXPECT_EQ(kCount - 1 + kIdxComp, PopStat(NUMBER_BLOCK_COMPRESSED));
EXPECT_EQ(0, PopStat(NUMBER_BLOCK_COMPRESSION_BYPASSED));
EXPECT_EQ(1 + kIdxRej, PopStat(NUMBER_BLOCK_COMPRESSION_REJECTED));
}
for (int i = 0; i < kCount; ++i) {
ASSERT_NE(Get(Key(i)), "NOT_FOUND");
}
ASSERT_EQ(Get(Key(kCount)), "NOT_FOUND");
EXPECT_EQ(indexCheckedCount.ExchangeRelaxed(0),
use_wrapper ? kIdxComp + kIdxRej : 0);
EXPECT_EQ(dataCheckedCount.ExchangeRelaxed(0), use_wrapper ? kCount : 0);
EXPECT_EQ(compressCalledCount.ExchangeRelaxed(0),
use_wrapper ? kIdxComp + kIdxRej + kCount : 0);
}
}
}
namespace {
std::string UniqueName(const std::string& base) {
static RelaxedAtomic<int> counter{0};
return base + std::to_string(counter.FetchAddRelaxed(1));
}
}
TEST_P(DBCompressionTestMaybeParallel, CompressionManagerCustomCompression) {
using Compressor8A = test::CompressorCustomAlg<kCustomCompression8A>;
using Compressor8B = test::CompressorCustomAlg<kCustomCompression8B>;
using Compressor8C = test::CompressorCustomAlg<kCustomCompression8C>;
if (!Compressor8A::Supported() || !LZ4_Supported()) {
fprintf(stderr,
"Prerequisite compression library not supported. Skipping\n");
return;
}
class MyManager : public CompressionManager {
public:
explicit MyManager(const std::string& compat_name)
: compat_name_(compat_name), name_("MyManager:" + compat_name_) {}
const char* Name() const override { return name_.c_str(); }
const char* CompatibilityName() const override {
return compat_name_.c_str();
}
bool SupportsCompressionType(CompressionType type) const override {
return type == kCustomCompression8A || type == kCustomCompression8B ||
type == kCustomCompression8C ||
GetBuiltinV2CompressionManager()->SupportsCompressionType(type);
}
int used_compressor8A_count_ = 0;
int used_compressor8B_count_ = 0;
int used_compressor8C_count_ = 0;
std::unique_ptr<Compressor> GetCompressor(const CompressionOptions& opts,
CompressionType type) override {
switch (static_cast<unsigned char>(type)) {
case kCustomCompression8A:
used_compressor8A_count_++;
return std::make_unique<Compressor8A>();
case kCustomCompression8B:
used_compressor8B_count_++;
return std::make_unique<Compressor8B>();
case kCustomCompression8C:
used_compressor8C_count_++;
return std::make_unique<Compressor8C>();
default:
return GetBuiltinV2CompressionManager()->GetCompressor(opts, type);
}
}
std::shared_ptr<Decompressor> GetDecompressor() override {
return std::make_shared<test::DecompressorCustomAlg>();
}
RelaxedAtomic<CompressionType> last_specific_decompressor_type_{
kNoCompression};
std::shared_ptr<Decompressor> GetDecompressorForTypes(
const CompressionType* types_begin,
const CompressionType* types_end) override {
assert(types_end > types_begin);
last_specific_decompressor_type_.StoreRelaxed(*types_begin);
auto decomp = std::make_shared<test::DecompressorCustomAlg>();
decomp->SetAllowedTypes(types_begin, types_end);
return decomp;
}
void AddFriend(const std::shared_ptr<CompressionManager>& mgr) {
friends_[mgr->CompatibilityName()] = mgr;
}
std::shared_ptr<CompressionManager> FindCompatibleCompressionManager(
Slice compatibility_name) override {
std::shared_ptr<CompressionManager> rv =
CompressionManager::FindCompatibleCompressionManager(
compatibility_name);
if (!rv) {
auto it = friends_.find(compatibility_name.ToString());
if (it != friends_.end()) {
return it->second.lock();
}
}
return rv;
}
private:
std::string compat_name_;
std::string name_;
std::map<std::string, std::weak_ptr<CompressionManager>> friends_;
};
auto mgr_foo = std::make_shared<MyManager>("Foo");
auto mgr_bar = std::make_shared<MyManager>(UniqueName("Bar"));
auto mgr_claim_compatible = std::make_shared<MyManager>("BuiltinV2");
constexpr uint16_t kValueSize = 10000;
Options options = CurrentOptions();
options.level0_file_num_compaction_trigger = 20;
BlockBasedTableOptions bbto;
bbto.enable_index_compression = false;
bbto.format_version = 6; options.table_factory.reset(NewBlockBasedTableFactory(bbto));
options.compression_manager = mgr_claim_compatible;
options.compression = kLZ4Compression;
options.compression_opts.max_dict_bytes = use_dict_ ? kValueSize / 2 : 0;
options.compression_opts.parallel_threads = parallel_threads_;
DestroyAndReopen(options);
Random rnd(404);
std::string value;
ASSERT_OK(Put("a", test::CompressibleString(&rnd, 0.1, kValueSize, &value)));
ASSERT_OK(Flush());
options.compression_manager = nullptr;
Reopen(options);
ASSERT_EQ(Get("a"), value);
Range r = {"a", "a0"};
TablePropertiesCollection tables_properties;
ASSERT_OK(db_->GetPropertiesOfTablesInRange(db_->DefaultColumnFamily(), &r, 1,
&tables_properties));
ASSERT_EQ(tables_properties.size(), 1U);
EXPECT_LT(tables_properties.begin()->second->data_size, kValueSize / 2);
EXPECT_EQ(tables_properties.begin()->second->compression_name, "LZ4");
options.compression_manager = mgr_claim_compatible;
options.compression = kCustomCompression8A;
ASSERT_EQ(TryReopen(options).code(), Status::Code::kInvalidArgument);
options.compression_manager = nullptr;
options.compression = kCustomCompressionFE;
ASSERT_EQ(TryReopen(options).code(), Status::Code::kInvalidArgument);
options.compression =
static_cast<CompressionType>(kLastBuiltinCompression + 1);
ASSERT_EQ(TryReopen(options).code(), Status::Code::kInvalidArgument);
options.compression_manager = mgr_foo;
options.compression = kLZ4Compression;
ASSERT_EQ(TryReopen(options).code(), Status::Code::kInvalidArgument);
bbto.format_version = 7;
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
options.compression_manager = mgr_claim_compatible;
options.compression = kCustomCompression8B;
ASSERT_EQ(TryReopen(options).code(), Status::Code::kInvalidArgument);
options.compression_manager = mgr_foo;
options.compression = kCustomCompressionF0;
ASSERT_EQ(TryReopen(options).code(), Status::Code::kNotSupported);
options.compression = kLZ4Compression;
Reopen(options);
ASSERT_OK(Put("b", test::CompressibleString(&rnd, 0.1, kValueSize, &value)));
ASSERT_OK(Flush());
ASSERT_EQ(NumTableFilesAtLevel(0), 2);
ASSERT_EQ(Get("b"), value);
r = {"b", "b0"};
tables_properties.clear();
ASSERT_OK(db_->GetPropertiesOfTablesInRange(db_->DefaultColumnFamily(), &r, 1,
&tables_properties));
ASSERT_EQ(tables_properties.size(), 1U);
EXPECT_LT(tables_properties.begin()->second->data_size, kValueSize / 2);
EXPECT_EQ(tables_properties.begin()->second->compression_name, "Foo;04;");
EXPECT_EQ(mgr_foo->last_specific_decompressor_type_.LoadRelaxed(),
kLZ4Compression);
options.compression = kCustomCompression8A;
Reopen(options);
ASSERT_OK(Put("c", test::CompressibleString(&rnd, 0.1, kValueSize, &value)));
EXPECT_EQ(mgr_foo->used_compressor8A_count_, 0);
ASSERT_OK(Flush());
ASSERT_EQ(NumTableFilesAtLevel(0), 3);
ASSERT_EQ(Get("c"), value);
EXPECT_EQ(mgr_foo->used_compressor8A_count_, 1);
r = {"c", "c0"};
tables_properties.clear();
ASSERT_OK(db_->GetPropertiesOfTablesInRange(db_->DefaultColumnFamily(), &r, 1,
&tables_properties));
ASSERT_EQ(tables_properties.size(), 1U);
EXPECT_LT(tables_properties.begin()->second->data_size, kValueSize / 2);
EXPECT_EQ(tables_properties.begin()->second->compression_name, "Foo;8A;");
EXPECT_EQ(mgr_foo->last_specific_decompressor_type_.LoadRelaxed(),
kCustomCompression8A);
ASSERT_OK(dbfull()->SetOptions({{"compression", "kLZ4Compression"}}));
ASSERT_OK(Put("d", test::CompressibleString(&rnd, 0.1, kValueSize, &value)));
ASSERT_OK(Flush());
ASSERT_EQ(NumTableFilesAtLevel(0), 4);
ASSERT_EQ(Get("d"), value);
r = {"d", "d0"};
tables_properties.clear();
ASSERT_OK(db_->GetPropertiesOfTablesInRange(db_->DefaultColumnFamily(), &r, 1,
&tables_properties));
ASSERT_EQ(tables_properties.size(), 1U);
EXPECT_LT(tables_properties.begin()->second->data_size, kValueSize / 2);
EXPECT_EQ(tables_properties.begin()->second->compression_name, "Foo;04;");
EXPECT_EQ(mgr_foo->last_specific_decompressor_type_.LoadRelaxed(),
kLZ4Compression);
ASSERT_OK(dbfull()->SetOptions({{"compression", "kCustomCompression8B"}}));
ASSERT_OK(Put("e", test::CompressibleString(&rnd, 0.1, kValueSize, &value)));
ASSERT_OK(Flush());
ASSERT_EQ(NumTableFilesAtLevel(0), 5);
ASSERT_EQ(Get("e"), value);
r = {"e", "e0"};
tables_properties.clear();
ASSERT_OK(db_->GetPropertiesOfTablesInRange(db_->DefaultColumnFamily(), &r, 1,
&tables_properties));
ASSERT_EQ(tables_properties.size(), 1U);
EXPECT_LT(tables_properties.begin()->second->data_size, kValueSize / 2);
EXPECT_EQ(tables_properties.begin()->second->compression_name, "Foo;8B;");
EXPECT_EQ(mgr_foo->last_specific_decompressor_type_.LoadRelaxed(),
kCustomCompression8B);
options.compression_manager = mgr_bar;
options.compression = kLZ4Compression;
ASSERT_EQ(TryReopen(options).code(), Status::Code::kNotSupported);
mgr_bar->AddFriend(mgr_foo);
Reopen(options);
ASSERT_EQ(Get("a").size(), kValueSize);
ASSERT_EQ(Get("b").size(), kValueSize);
ASSERT_EQ(Get("c").size(), kValueSize);
ASSERT_EQ(Get("d").size(), kValueSize);
ASSERT_EQ(Get("e").size(), kValueSize);
ASSERT_OK(Put("f", test::CompressibleString(&rnd, 0.1, kValueSize, &value)));
ASSERT_OK(Flush());
ASSERT_EQ(NumTableFilesAtLevel(0), 6);
ASSERT_EQ(Get("f"), value);
r = {"f", "f0"};
tables_properties.clear();
ASSERT_OK(db_->GetPropertiesOfTablesInRange(db_->DefaultColumnFamily(), &r, 1,
&tables_properties));
ASSERT_EQ(tables_properties.size(), 1U);
EXPECT_LT(tables_properties.begin()->second->data_size, kValueSize / 2);
EXPECT_EQ(mgr_bar->last_specific_decompressor_type_.LoadRelaxed(),
kLZ4Compression);
options.compression_manager = mgr_foo;
ASSERT_EQ(TryReopen(options).code(), Status::Code::kNotSupported);
auto& library = *ObjectLibrary::Default();
library.AddFactory<CompressionManager>(
mgr_bar->CompatibilityName(),
[mgr_bar](const std::string& ,
std::unique_ptr<CompressionManager>* guard,
std::string* ) {
*guard = std::make_unique<MyManager>(mgr_bar->CompatibilityName());
return guard->get();
});
Reopen(options);
ASSERT_EQ(Get("a").size(), kValueSize);
ASSERT_EQ(Get("b").size(), kValueSize);
ASSERT_EQ(Get("c").size(), kValueSize);
ASSERT_EQ(Get("d").size(), kValueSize);
ASSERT_EQ(Get("e").size(), kValueSize);
ASSERT_EQ(Get("f").size(), kValueSize);
}
TEST_F(DBCompressionTest, FailWhenCompressionNotSupportedTest) {
CompressionType compressions[] = {kZlibCompression, kBZip2Compression,
kLZ4Compression, kLZ4HCCompression,
kXpressCompression};
for (auto comp : compressions) {
if (!CompressionTypeSupported(comp)) {
Options options = CurrentOptions();
options.compression = comp;
ASSERT_TRUE(!TryReopen(options).ok());
options.compression = kNoCompression;
ASSERT_OK(TryReopen(options));
ColumnFamilyOptions cf_options(options);
cf_options.compression = comp;
ColumnFamilyHandle* handle;
ASSERT_TRUE(!db_->CreateColumnFamily(cf_options, "name", &handle).ok());
}
}
}
class AutoSkipTestFlushBlockPolicy : public FlushBlockPolicy {
public:
explicit AutoSkipTestFlushBlockPolicy(const int window,
const BlockBuilder& data_block_builder,
std::shared_ptr<Statistics> statistics)
: window_(window),
num_keys_(0),
data_block_builder_(data_block_builder),
statistics_(statistics) {}
bool Update(const Slice& , const Slice& ) override {
auto nth_window = num_keys_ / window_;
if (data_block_builder_.empty()) {
return false;
}
if (num_keys_ % window_ == 0) {
auto set_exploration = [&](void* arg) {
bool* exploration = static_cast<bool*>(arg);
*exploration = true;
};
auto unset_exploration = [&](void* arg) {
bool* exploration = static_cast<bool*>(arg);
*exploration = false;
};
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
if (nth_window % 2 == 0) {
SyncPoint::GetInstance()->SetCallBack(
"AutoSkipCompressorWrapper::CompressBlock::exploitOrExplore",
set_exploration);
} else {
SyncPoint::GetInstance()->SetCallBack(
"AutoSkipCompressorWrapper::CompressBlock::exploitOrExplore",
unset_exploration);
}
SyncPoint::GetInstance()->EnableProcessing();
auto compressed_count = PopStat(NUMBER_BLOCK_COMPRESSED);
auto bypassed_count = PopStat(NUMBER_BLOCK_COMPRESSION_BYPASSED);
auto rejected_count = PopStat(NUMBER_BLOCK_COMPRESSION_REJECTED);
auto total = compressed_count + rejected_count + bypassed_count;
int rejection_percentage, bypassed_percentage, compressed_percentage;
if (total != 0) {
rejection_percentage = static_cast<int>(rejected_count * 100 / total);
bypassed_percentage = static_cast<int>(bypassed_count * 100 / total);
compressed_percentage =
static_cast<int>(compressed_count * 100 / total);
switch (nth_window) {
case 1:
EXPECT_EQ(rejection_percentage, 60);
EXPECT_EQ(bypassed_percentage, 0);
EXPECT_EQ(compressed_percentage, 40);
break;
case 2:
EXPECT_EQ(rejection_percentage, 0);
EXPECT_EQ(bypassed_percentage, 100);
EXPECT_EQ(compressed_percentage, 0);
break;
case 3:
EXPECT_EQ(rejection_percentage, 40);
EXPECT_EQ(bypassed_percentage, 0);
EXPECT_EQ(compressed_percentage, 60);
break;
case 4:
EXPECT_EQ(rejection_percentage, 60);
EXPECT_EQ(bypassed_percentage, 0);
EXPECT_EQ(compressed_percentage, 40);
}
}
}
num_keys_++;
return true;
}
uint64_t PopStat(Tickers t) { return statistics_->getAndResetTickerCount(t); }
private:
int window_;
int num_keys_;
const BlockBuilder& data_block_builder_;
std::shared_ptr<Statistics> statistics_;
};
class AutoSkipTestFlushBlockPolicyFactory : public FlushBlockPolicyFactory {
public:
explicit AutoSkipTestFlushBlockPolicyFactory(
const int window, std::shared_ptr<Statistics> statistics)
: window_(window), statistics_(statistics) {}
virtual const char* Name() const override {
return "AutoSkipTestFlushBlockPolicyFactory";
}
virtual FlushBlockPolicy* NewFlushBlockPolicy(
const BlockBasedTableOptions& ,
const BlockBuilder& data_block_builder) const override {
(void)data_block_builder;
return new AutoSkipTestFlushBlockPolicy(window_, data_block_builder,
statistics_);
}
private:
int window_;
std::shared_ptr<Statistics> statistics_;
};
class DBAutoSkip : public DBTestBase {
public:
Options options;
Random rnd_;
int key_index_;
DBAutoSkip()
: DBTestBase("db_auto_skip", true),
options(CurrentOptions()),
rnd_(231),
key_index_(0) {
options.compression_manager = CreateAutoSkipCompressionManager();
auto statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
options.statistics = statistics;
options.statistics->set_stats_level(StatsLevel::kExceptTimeForMutex);
BlockBasedTableOptions bbto;
bbto.enable_index_compression = false;
bbto.flush_block_policy_factory.reset(
new AutoSkipTestFlushBlockPolicyFactory(10, statistics));
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
}
bool CompressionFriendlyPut(const int no_of_kvs, const int size_of_value) {
auto value = std::string(size_of_value, 'A');
for (int i = 0; i < no_of_kvs; ++i) {
auto status = Put(Key(key_index_), value);
EXPECT_EQ(status.ok(), true);
key_index_++;
}
return true;
}
bool CompressionUnfriendlyPut(const int no_of_kvs, const int size_of_value) {
auto value = rnd_.RandomBinaryString(size_of_value);
for (int i = 0; i < no_of_kvs; ++i) {
auto status = Put(Key(key_index_), value);
EXPECT_EQ(status.ok(), true);
key_index_++;
}
return true;
}
};
TEST_F(DBAutoSkip, AutoSkipCompressionManager) {
for (uint32_t max_dict_bytes : {0, 10000}) {
for (auto type : GetSupportedCompressions()) {
if (type == kNoCompression) {
continue;
}
options.compression = type;
options.bottommost_compression = type;
options.compression_opts.max_dict_bytes = max_dict_bytes;
DestroyAndReopen(options);
const int kValueSize = 20000;
CompressionUnfriendlyPut(6, kValueSize);
CompressionFriendlyPut(4, kValueSize);
CompressionUnfriendlyPut(6, kValueSize);
CompressionFriendlyPut(4, kValueSize);
CompressionUnfriendlyPut(4, kValueSize);
CompressionFriendlyPut(6, kValueSize);
CompressionUnfriendlyPut(6, kValueSize);
CompressionFriendlyPut(4, kValueSize);
CompressionFriendlyPut(6, kValueSize);
CompressionFriendlyPut(4, kValueSize);
ASSERT_OK(Flush());
}
}
}
class CostAwareTestFlushBlockPolicy : public FlushBlockPolicy {
public:
explicit CostAwareTestFlushBlockPolicy(const int window,
const BlockBuilder& data_block_builder)
: window_(window),
num_keys_(0),
data_block_builder_(data_block_builder) {}
bool Update(const Slice& , const Slice& ) override {
auto nth_window = num_keys_ / window_;
if (data_block_builder_.empty()) {
return false;
}
if (num_keys_ % window_ == 0) {
auto get_predictor = [&](void* arg) {
predictor_ = static_cast<IOCPUCostPredictor*>(arg);
predictor_->CPUPredictor.SetPrediction(1000);
predictor_->IOPredictor.SetPrediction(100);
};
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->SetCallBack(
"CostAwareCompressor::CompressBlockAndRecord::"
"GetPredictor",
get_predictor);
SyncPoint::GetInstance()->EnableProcessing();
switch (nth_window) {
case 0:
break;
case 1:
auto predicted_cpu_time = predictor_->CPUPredictor.Predict();
auto predicted_io_bytes = predictor_->IOPredictor.Predict();
EXPECT_EQ(predicted_io_bytes, 100);
EXPECT_EQ(predicted_cpu_time, 1000);
break;
}
}
num_keys_++;
return true;
}
private:
int window_;
int num_keys_;
const BlockBuilder& data_block_builder_;
IOCPUCostPredictor* predictor_;
};
class CostAwareTestFlushBlockPolicyFactory : public FlushBlockPolicyFactory {
public:
explicit CostAwareTestFlushBlockPolicyFactory(const int window)
: window_(window) {}
virtual const char* Name() const override {
return "CostAwareTestFlushBlockPolicyFactory";
}
virtual FlushBlockPolicy* NewFlushBlockPolicy(
const BlockBasedTableOptions& ,
const BlockBuilder& data_block_builder) const override {
(void)data_block_builder;
return new CostAwareTestFlushBlockPolicy(window_, data_block_builder);
}
private:
int window_;
};
class DBCompressionCostPredictor : public DBTestBase {
public:
Options options;
DBCompressionCostPredictor()
: DBTestBase("db_cpuio_skip", true),
options(CurrentOptions()) {
options.compression_manager = CreateCostAwareCompressionManager();
auto statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
options.statistics = statistics;
options.statistics->set_stats_level(StatsLevel::kExceptTimeForMutex);
BlockBasedTableOptions bbto;
bbto.enable_index_compression = false;
bbto.flush_block_policy_factory.reset(
new CostAwareTestFlushBlockPolicyFactory(10));
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
DestroyAndReopen(options);
}
};
TEST_F(DBCompressionCostPredictor, CostAwareCompressorManager) {
if (!ZSTD_Supported()) {
return;
}
const int kValueSize = 20000;
int next_key = 0;
Random rnd(231);
auto value = rnd.RandomBinaryString(kValueSize);
int window_size = 10;
auto WindowWrite = [&]() {
for (auto i = 0; i < window_size; ++i) {
auto status = Put(Key(next_key), value);
EXPECT_OK(status);
next_key++;
}
};
WindowWrite();
WindowWrite();
ASSERT_OK(Flush());
}
TEST_F(DBCompressionTest, PreDefinedDictionaryCompression) {
if (!ZSTD_Supported()) {
ROCKSDB_GTEST_BYPASS("ZSTD compression not supported");
return;
}
class PreDefinedDictCompressor : public CompressorWrapper {
public:
explicit PreDefinedDictCompressor(std::unique_ptr<Compressor> wrapped,
std::string dict_data)
: CompressorWrapper(std::move(wrapped)),
predefined_dict_(std::move(dict_data)) {}
const char* Name() const override { return "PreDefinedDictCompressor"; }
DictConfig GetDictGuidance(CacheEntryRole block_type) const override {
if (block_type == CacheEntryRole::kDataBlock &&
!predefined_dict_.empty()) {
return DictPreDefined{ predefined_dict_};
}
return DictDisabled{};
}
std::unique_ptr<Compressor> Clone() const override {
return std::make_unique<PreDefinedDictCompressor>(wrapped_->Clone(),
predefined_dict_);
}
std::unique_ptr<Compressor> MaybeCloneSpecialized(
CacheEntryRole block_type,
DictConfigArgs&& dict_config) const override {
auto specialized =
wrapped_->MaybeCloneSpecialized(block_type, std::move(dict_config));
if (specialized) {
return specialized;
}
return nullptr;
}
private:
std::string predefined_dict_;
};
static const char* kTestCompatibilityName = "PreDefinedDictTest";
class PreDefinedDictManager : public CompressionManagerWrapper {
public:
explicit PreDefinedDictManager(std::shared_ptr<CompressionManager> wrapped,
std::string dict_data)
: CompressionManagerWrapper(std::move(wrapped)),
predefined_dict_(std::move(dict_data)) {}
const char* Name() const override { return "PreDefinedDictManager"; }
const char* CompatibilityName() const override {
return kTestCompatibilityName;
}
std::unique_ptr<Compressor> GetCompressorForSST(
const FilterBuildingContext& context, const CompressionOptions& opts,
CompressionType preferred) override {
auto base = wrapped_->GetCompressorForSST(context, opts, preferred);
if (base) {
return std::make_unique<PreDefinedDictCompressor>(std::move(base),
predefined_dict_);
}
return nullptr;
}
private:
std::string predefined_dict_;
};
class BrokenDictManager : public CompressionManagerWrapper {
public:
explicit BrokenDictManager(std::shared_ptr<CompressionManager> wrapped)
: CompressionManagerWrapper(std::move(wrapped)) {}
const char* Name() const override { return "BrokenDictManager"; }
const char* CompatibilityName() const override {
return kTestCompatibilityName;
}
std::shared_ptr<Decompressor> GetDecompressor() override {
return std::make_shared<IgnoreDictDecompressor>(
wrapped_->GetDecompressor());
}
std::shared_ptr<Decompressor> GetDecompressorOptimizeFor(
CompressionType optimize_for_type) override {
return std::make_shared<IgnoreDictDecompressor>(
wrapped_->GetDecompressorOptimizeFor(optimize_for_type));
}
std::shared_ptr<Decompressor> GetDecompressorForTypes(
const CompressionType* types_begin,
const CompressionType* types_end) override {
return std::make_shared<IgnoreDictDecompressor>(
wrapped_->GetDecompressorForTypes(types_begin, types_end));
}
private:
class IgnoreDictDecompressor : public DecompressorWrapper {
public:
explicit IgnoreDictDecompressor(std::shared_ptr<Decompressor> wrapped)
: DecompressorWrapper(std::move(wrapped)) {}
IgnoreDictDecompressor(std::shared_ptr<Decompressor> wrapped,
std::string dict)
: DecompressorWrapper(std::move(wrapped)),
dict_(std::move(dict)),
dict_slice_(dict_) {}
const char* Name() const override { return "IgnoreDictDecompressor"; }
const Slice& GetSerializedDict() const override { return dict_slice_; }
Status MaybeCloneForDict(const Slice& serialized_dict,
std::unique_ptr<Decompressor>* out) override {
*out = std::make_unique<IgnoreDictDecompressor>(
wrapped_,
std::string(serialized_dict.data(), serialized_dict.size()));
return Status::OK();
}
private:
std::string dict_;
Slice dict_slice_;
};
};
Random rnd(42);
std::string predefined_dict;
std::vector<std::string> dict_patterns;
for (int i = 0; i < 50; i++) {
std::string pattern = rnd.RandomString(200);
dict_patterns.push_back(pattern);
predefined_dict += pattern;
}
size_t kDictSize = predefined_dict.size();
auto mgr = std::make_shared<PreDefinedDictManager>(
GetBuiltinV2CompressionManager(), predefined_dict);
Options options = CurrentOptions();
options.compression = kZSTD;
options.compression_opts.max_dict_bytes = static_cast<int>(kDictSize);
options.compression_manager = mgr;
options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
BlockBasedTableOptions bbto;
bbto.enable_index_compression = true;
bbto.format_version = 7;
bbto.block_cache = NewLRUCache(1 << 20);
bbto.cache_index_and_filter_blocks = true;
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
DestroyAndReopen(options);
std::vector<std::string> expected_values;
for (int i = 0; i < 100; i++) {
std::string value;
for (int j = 0; j < 5; j++) {
value +=
dict_patterns[rnd.Uniform(static_cast<int>(dict_patterns.size()))];
}
expected_values.push_back(value);
ASSERT_OK(Put(Key(i), value));
}
ASSERT_OK(Flush());
ASSERT_GE(
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT),
predefined_dict.size());
for (int i = 0; i < 100; i++) {
std::string value;
ASSERT_OK(db_->Get(ReadOptions(), Key(i), &value));
ASSERT_EQ(value, expected_values[i]);
}
Close();
auto broken_mgr =
std::make_shared<BrokenDictManager>(GetBuiltinV2CompressionManager());
options.compression_manager = broken_mgr;
bbto.block_cache = NewLRUCache(1 << 20);
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
ASSERT_OK(TryReopen(options));
std::string value;
ASSERT_EQ(db_->Get(ReadOptions(), Key(0), &value).code(),
Status::kCorruption);
}
} int main(int argc, char** argv) {
ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
RegisterCustomObjects(argc, argv);
return RUN_ALL_TESTS();
}