#include <atomic>
#include <cstdlib>
#include <functional>
#include <memory>
#include "db/db_test_util.h"
#include "db/read_callback.h"
#include "db/version_edit.h"
#include "env/fs_readonly.h"
#include "options/options_helper.h"
#include "port/port.h"
#include "port/stack_trace.h"
#include "rocksdb/experimental.h"
#include "rocksdb/iostats_context.h"
#include "rocksdb/persistent_cache.h"
#include "rocksdb/trace_record.h"
#include "rocksdb/trace_record_result.h"
#include "rocksdb/utilities/replayer.h"
#include "rocksdb/wal_filter.h"
#include "test_util/testutil.h"
#include "util/defer.h"
#include "util/random.h"
#include "utilities/fault_injection_env.h"
namespace ROCKSDB_NAMESPACE {
class DBTest2 : public DBTestBase {
public:
DBTest2() : DBTestBase("db_test2", true) {}
};
TEST_F(DBTest2, OpenForReadOnly) {
std::unique_ptr<DB> db_ptr;
std::string dbname = test::PerThreadDBPath("db_readonly");
Options options = CurrentOptions();
options.create_if_missing = true;
ASSERT_NOK(DB::OpenForReadOnly(options, dbname, &db_ptr));
std::vector<std::string> files;
ASSERT_OK(env_->GetChildren(dbname, &files));
for (auto& f : files) {
ASSERT_OK(env_->DeleteFile(dbname + "/" + f));
}
ASSERT_OK(env_->DeleteDir(dbname));
options.create_if_missing = false;
ASSERT_NOK(DB::OpenForReadOnly(options, dbname, &db_ptr));
ASSERT_NOK(env_->FileExists(dbname));
}
TEST_F(DBTest2, OpenForReadOnlyWithColumnFamilies) {
std::unique_ptr<DB> db_ptr;
std::string dbname = test::PerThreadDBPath("db_readonly");
Options options = CurrentOptions();
options.create_if_missing = true;
ColumnFamilyOptions cf_options(options);
std::vector<ColumnFamilyDescriptor> column_families;
column_families.emplace_back(kDefaultColumnFamilyName, cf_options);
column_families.emplace_back("goku", cf_options);
std::vector<ColumnFamilyHandle*> handles;
ASSERT_NOK(
DB::OpenForReadOnly(options, dbname, column_families, &handles, &db_ptr));
std::vector<std::string> files;
ASSERT_OK(env_->GetChildren(dbname, &files));
for (auto& f : files) {
ASSERT_OK(env_->DeleteFile(dbname + "/" + f));
}
ASSERT_OK(env_->DeleteDir(dbname));
options.create_if_missing = false;
ASSERT_NOK(
DB::OpenForReadOnly(options, dbname, column_families, &handles, &db_ptr));
ASSERT_NOK(env_->FileExists(dbname));
}
class PartitionedIndexTestListener : public EventListener {
public:
void OnFlushCompleted(DB* , const FlushJobInfo& info) override {
ASSERT_GT(info.table_properties.index_partitions, 1);
ASSERT_EQ(info.table_properties.index_key_is_user_key, 0);
}
};
TEST_F(DBTest2, PartitionedIndexUserToInternalKey) {
const int kValueSize = 10500;
const int kNumEntriesPerFile = 1000;
const int kNumFiles = 3;
const int kNumDistinctKeys = 30;
BlockBasedTableOptions table_options;
Options options = CurrentOptions();
options.disable_auto_compactions = true;
table_options.index_type = BlockBasedTableOptions::kTwoLevelIndexSearch;
PartitionedIndexTestListener* listener = new PartitionedIndexTestListener();
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
options.listeners.emplace_back(listener);
std::vector<const Snapshot*> snapshots;
Reopen(options);
Random rnd(301);
for (int i = 0; i < kNumFiles; i++) {
for (int j = 0; j < kNumEntriesPerFile; j++) {
int key_id = (i * kNumEntriesPerFile + j) % kNumDistinctKeys;
std::string value = rnd.RandomString(kValueSize);
ASSERT_OK(Put("keykey_" + std::to_string(key_id), value));
snapshots.push_back(db_->GetSnapshot());
}
ASSERT_OK(Flush());
}
for (auto s : snapshots) {
db_->ReleaseSnapshot(s);
}
}
class PrefixFullBloomWithReverseComparator
: public DBTestBase,
public ::testing::WithParamInterface<bool> {
public:
PrefixFullBloomWithReverseComparator()
: DBTestBase("prefix_bloom_reverse", true) {}
void SetUp() override { if_cache_filter_ = GetParam(); }
bool if_cache_filter_;
};
TEST_P(PrefixFullBloomWithReverseComparator,
PrefixFullBloomWithReverseComparator) {
Options options = last_options_;
options.comparator = ReverseBytewiseComparator();
options.prefix_extractor.reset(NewCappedPrefixTransform(3));
options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
BlockBasedTableOptions bbto;
if (if_cache_filter_) {
bbto.no_block_cache = false;
bbto.cache_index_and_filter_blocks = true;
bbto.block_cache = NewLRUCache(1);
}
bbto.filter_policy.reset(NewBloomFilterPolicy(10, false));
bbto.whole_key_filtering = false;
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
DestroyAndReopen(options);
ASSERT_OK(dbfull()->Put(WriteOptions(), "bar123", "foo"));
ASSERT_OK(dbfull()->Put(WriteOptions(), "bar234", "foo2"));
ASSERT_OK(dbfull()->Put(WriteOptions(), "foo123", "foo3"));
ASSERT_OK(dbfull()->Flush(FlushOptions()));
if (bbto.block_cache) {
bbto.block_cache->EraseUnRefEntries();
}
std::unique_ptr<Iterator> iter(db_->NewIterator(ReadOptions()));
iter->Seek("bar345");
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("bar234", iter->key().ToString());
ASSERT_EQ("foo2", iter->value().ToString());
iter->Next();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("bar123", iter->key().ToString());
ASSERT_EQ("foo", iter->value().ToString());
iter->Seek("foo234");
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("foo123", iter->key().ToString());
ASSERT_EQ("foo3", iter->value().ToString());
iter->Seek("bar");
ASSERT_OK(iter->status());
ASSERT_TRUE(!iter->Valid());
}
INSTANTIATE_TEST_CASE_P(PrefixFullBloomWithReverseComparator,
PrefixFullBloomWithReverseComparator, testing::Bool());
TEST_F(DBTest2, IteratorPropertyVersionNumber) {
ASSERT_OK(Put("", ""));
Iterator* iter1 = db_->NewIterator(ReadOptions());
ASSERT_OK(iter1->status());
std::string prop_value;
ASSERT_OK(
iter1->GetProperty("rocksdb.iterator.super-version-number", &prop_value));
uint64_t version_number1 =
static_cast<uint64_t>(std::atoi(prop_value.c_str()));
ASSERT_OK(Put("", ""));
ASSERT_OK(Flush());
Iterator* iter2 = db_->NewIterator(ReadOptions());
ASSERT_OK(iter2->status());
ASSERT_OK(
iter2->GetProperty("rocksdb.iterator.super-version-number", &prop_value));
uint64_t version_number2 =
static_cast<uint64_t>(std::atoi(prop_value.c_str()));
ASSERT_GT(version_number2, version_number1);
ASSERT_OK(Put("", ""));
Iterator* iter3 = db_->NewIterator(ReadOptions());
ASSERT_OK(iter3->status());
ASSERT_OK(
iter3->GetProperty("rocksdb.iterator.super-version-number", &prop_value));
uint64_t version_number3 =
static_cast<uint64_t>(std::atoi(prop_value.c_str()));
ASSERT_EQ(version_number2, version_number3);
iter1->SeekToFirst();
ASSERT_OK(
iter1->GetProperty("rocksdb.iterator.super-version-number", &prop_value));
uint64_t version_number1_new =
static_cast<uint64_t>(std::atoi(prop_value.c_str()));
ASSERT_EQ(version_number1, version_number1_new);
delete iter1;
delete iter2;
delete iter3;
}
TEST_F(DBTest2, CacheIndexAndFilterWithDBRestart) {
Options options = CurrentOptions();
options.create_if_missing = true;
options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
BlockBasedTableOptions table_options;
table_options.cache_index_and_filter_blocks = true;
table_options.filter_policy.reset(NewBloomFilterPolicy(20));
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
CreateAndReopenWithCF({"pikachu"}, options);
ASSERT_OK(Put(1, "a", "begin"));
ASSERT_OK(Put(1, "z", "end"));
ASSERT_OK(Flush(1));
ASSERT_OK(TryReopenWithColumnFamilies({"default", "pikachu"}, options));
std::string value;
value = Get(1, "a");
}
TEST_F(DBTest2, MaxSuccessiveMergesChangeWithDBRecovery) {
Options options = CurrentOptions();
options.create_if_missing = true;
options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
options.max_successive_merges = 3;
options.merge_operator = MergeOperators::CreatePutOperator();
options.disable_auto_compactions = true;
DestroyAndReopen(options);
ASSERT_OK(Put("poi", "Finch"));
ASSERT_OK(db_->Merge(WriteOptions(), "poi", "Reese"));
ASSERT_OK(db_->Merge(WriteOptions(), "poi", "Shaw"));
ASSERT_OK(db_->Merge(WriteOptions(), "poi", "Root"));
options.max_successive_merges = 2;
Reopen(options);
}
class DBTestSharedWriteBufferAcrossCFs
: public DBTestBase,
public testing::WithParamInterface<std::tuple<bool, bool>> {
public:
DBTestSharedWriteBufferAcrossCFs()
: DBTestBase("db_test_shared_write_buffer", true) {}
void SetUp() override {
use_old_interface_ = std::get<0>(GetParam());
cost_cache_ = std::get<1>(GetParam());
}
bool use_old_interface_;
bool cost_cache_;
};
TEST_P(DBTestSharedWriteBufferAcrossCFs, SharedWriteBufferAcrossCFs) {
Options options = CurrentOptions();
options.arena_block_size = 4096;
auto flush_listener = std::make_shared<FlushCounterListener>();
options.listeners.push_back(flush_listener);
options.avoid_flush_during_shutdown = true;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"Arena::Arena:0", [&](void* arg) {
size_t* block_size = static_cast<size_t*>(arg);
*block_size = 1;
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"Arena::AllocateNewBlock:0", [&](void* arg) {
std::pair<size_t*, size_t*>* pair =
static_cast<std::pair<size_t*, size_t*>*>(arg);
*std::get<0>(*pair) = *std::get<1>(*pair);
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
std::shared_ptr<Cache> cache = NewLRUCache(4 * 1024 * 1024, 2);
ASSERT_LT(cache->GetUsage(), 256 * 1024);
if (use_old_interface_) {
options.db_write_buffer_size = 120000; } else if (!cost_cache_) {
options.write_buffer_manager.reset(new WriteBufferManager(114285));
} else {
options.write_buffer_manager.reset(new WriteBufferManager(114285, cache));
}
options.write_buffer_size = 500000; CreateAndReopenWithCF({"pikachu", "dobrynia", "nikitich"}, options);
WriteOptions wo;
wo.disableWAL = true;
std::function<void()> wait_flush = [&]() {
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[0]));
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[1]));
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[2]));
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[3]));
ASSERT_OK(dbfull()->TEST_WaitForBackgroundWork());
};
flush_listener->expected_flush_reason = FlushReason::kManualFlush;
ASSERT_OK(Put(3, Key(1), DummyString(1), wo));
ASSERT_OK(Flush(3));
ASSERT_OK(Put(3, Key(1), DummyString(1), wo));
ASSERT_OK(Put(0, Key(1), DummyString(1), wo));
ASSERT_OK(Flush(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_.get(), "default"),
static_cast<uint64_t>(1));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_.get(), "nikitich"),
static_cast<uint64_t>(1));
flush_listener->expected_flush_reason = FlushReason::kWriteBufferManager;
ASSERT_OK(Put(3, Key(1), DummyString(30000), wo));
if (cost_cache_) {
ASSERT_GE(cache->GetUsage(), 256 * 1024);
ASSERT_LE(cache->GetUsage(), 2 * 256 * 1024);
}
wait_flush();
ASSERT_OK(Put(0, Key(1), DummyString(60000), wo));
if (cost_cache_) {
ASSERT_GE(cache->GetUsage(), 256 * 1024);
ASSERT_LE(cache->GetUsage(), 2 * 256 * 1024);
}
wait_flush();
ASSERT_OK(Put(2, Key(1), DummyString(1), wo));
wait_flush();
{
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_.get(), "default"),
static_cast<uint64_t>(1));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_.get(), "pikachu"),
static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_.get(), "dobrynia"),
static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_.get(), "nikitich"),
static_cast<uint64_t>(1));
}
ASSERT_OK(Put(3, Key(2), DummyString(30000), wo));
wait_flush();
ASSERT_OK(Put(0, Key(1), DummyString(1), wo));
wait_flush();
{
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_.get(), "default"),
static_cast<uint64_t>(1));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_.get(), "pikachu"),
static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_.get(), "dobrynia"),
static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_.get(), "nikitich"),
static_cast<uint64_t>(2));
}
ASSERT_OK(Put(2, Key(1), DummyString(30000), wo));
wait_flush();
ASSERT_OK(Put(2, Key(1), DummyString(1), wo));
wait_flush();
ASSERT_OK(Put(2, Key(1), DummyString(1), wo));
wait_flush();
{
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_.get(), "default"),
static_cast<uint64_t>(1));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_.get(), "pikachu"),
static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_.get(), "dobrynia"),
static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_.get(), "nikitich"),
static_cast<uint64_t>(2));
}
ASSERT_OK(Put(2, Key(2), DummyString(10000), wo));
wait_flush();
ASSERT_OK(Put(3, Key(1), DummyString(1), wo));
wait_flush();
ASSERT_OK(Put(0, Key(1), DummyString(1), wo));
wait_flush();
ASSERT_OK(Put(0, Key(1), DummyString(1), wo));
wait_flush();
ASSERT_OK(Put(0, Key(1), DummyString(1), wo));
wait_flush();
{
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_.get(), "default"),
static_cast<uint64_t>(2));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_.get(), "pikachu"),
static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_.get(), "dobrynia"),
static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_.get(), "nikitich"),
static_cast<uint64_t>(2));
}
ASSERT_OK(Put(1, Key(1), DummyString(1), wo));
wait_flush();
ASSERT_OK(Put(2, Key(1), DummyString(80000), wo));
wait_flush();
ASSERT_OK(Put(1, Key(1), DummyString(1), wo));
wait_flush();
ASSERT_OK(Put(2, Key(1), DummyString(1), wo));
wait_flush();
{
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_.get(), "default"),
static_cast<uint64_t>(2));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_.get(), "pikachu"),
static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_.get(), "dobrynia"),
static_cast<uint64_t>(1));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_.get(), "nikitich"),
static_cast<uint64_t>(2));
}
if (cost_cache_) {
ASSERT_GE(cache->GetUsage(), 256 * 1024);
Close();
options.write_buffer_manager.reset();
last_options_.write_buffer_manager.reset();
ASSERT_LT(cache->GetUsage(), 256 * 1024);
}
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
INSTANTIATE_TEST_CASE_P(DBTestSharedWriteBufferAcrossCFs,
DBTestSharedWriteBufferAcrossCFs,
::testing::Values(std::make_tuple(true, false),
std::make_tuple(false, false),
std::make_tuple(false, true)));
TEST_F(DBTest2, SharedWriteBufferLimitAcrossDB) {
std::string dbname2 = test::PerThreadDBPath("db_shared_wb_db2");
Options options = CurrentOptions();
options.arena_block_size = 4096;
auto flush_listener = std::make_shared<FlushCounterListener>();
options.listeners.push_back(flush_listener);
options.avoid_flush_during_shutdown = true;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"Arena::Arena:0", [&](void* arg) {
size_t* block_size = static_cast<size_t*>(arg);
*block_size = 1;
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"Arena::AllocateNewBlock:0", [&](void* arg) {
std::pair<size_t*, size_t*>* pair =
static_cast<std::pair<size_t*, size_t*>*>(arg);
*std::get<0>(*pair) = *std::get<1>(*pair);
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
options.write_buffer_size = 500000; options.write_buffer_manager.reset(new WriteBufferManager(120000));
CreateAndReopenWithCF({"cf1", "cf2"}, options);
ASSERT_OK(DestroyDB(dbname2, options));
std::unique_ptr<DB> db2;
ASSERT_OK(DB::Open(options, dbname2, &db2));
WriteOptions wo;
wo.disableWAL = true;
std::function<void()> wait_flush = [&]() {
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[0]));
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[1]));
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[2]));
ASSERT_OK(static_cast<DBImpl*>(db2.get())->TEST_WaitForFlushMemTable());
ASSERT_OK(dbfull()->TEST_WaitForBackgroundWork());
ASSERT_OK(static_cast_with_check<DBImpl>(db2.get())
->TEST_WaitForBackgroundWork());
};
flush_listener->expected_flush_reason = FlushReason::kWriteBufferManager;
ASSERT_OK(Put(2, Key(1), DummyString(70000), wo));
wait_flush();
ASSERT_OK(Put(0, Key(1), DummyString(20000), wo));
wait_flush();
ASSERT_OK(db2->Put(wo, Key(2), DummyString(20000)));
wait_flush();
ASSERT_OK(Put(2, Key(1), DummyString(1), wo));
wait_flush();
ASSERT_OK(static_cast<DBImpl*>(db2.get())->TEST_WaitForFlushMemTable());
{
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_.get(), "default") +
GetNumberOfSstFilesForColumnFamily(db_.get(), "cf1") +
GetNumberOfSstFilesForColumnFamily(db_.get(), "cf2"),
static_cast<uint64_t>(1));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db2.get(), "default"),
static_cast<uint64_t>(0));
}
ASSERT_OK(db2->Put(wo, Key(2), DummyString(70000)));
wait_flush();
ASSERT_OK(Put(2, Key(1), DummyString(1), wo));
wait_flush();
{
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_.get(), "default"),
static_cast<uint64_t>(1));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_.get(), "cf1"),
static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_.get(), "cf2"),
static_cast<uint64_t>(1));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db2.get(), "default"),
static_cast<uint64_t>(0));
}
ASSERT_OK(db2->Put(wo, Key(3), DummyString(40000)));
wait_flush();
ASSERT_OK(db2->Put(wo, Key(1), DummyString(1)));
wait_flush();
ASSERT_OK(static_cast<DBImpl*>(db2.get())->TEST_WaitForFlushMemTable());
{
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_.get(), "default"),
static_cast<uint64_t>(1));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_.get(), "cf1"),
static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_.get(), "cf2"),
static_cast<uint64_t>(1));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db2.get(), "default"),
static_cast<uint64_t>(1));
}
db2.reset();
ASSERT_OK(DestroyDB(dbname2, options));
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(DBTest2, TestWriteBufferNoLimitWithCache) {
Options options = CurrentOptions();
options.arena_block_size = 4096;
std::shared_ptr<Cache> cache = NewLRUCache(LRUCacheOptions(
10000000 , 1 ,
false , 0.0 ,
nullptr , kDefaultToAdaptiveMutex,
kDontChargeCacheMetadata));
options.write_buffer_size = 50000; options.write_buffer_manager.reset(new WriteBufferManager(0, cache));
Reopen(options);
ASSERT_OK(Put("foo", "bar"));
ASSERT_GT(cache->GetUsage(), 128000);
}
namespace {
void ValidateKeyExistence(DB* db, const std::vector<Slice>& keys_must_exist,
const std::vector<Slice>& keys_must_not_exist) {
std::vector<std::string> values;
if (keys_must_exist.size() > 0) {
std::vector<Status> status_list =
db->MultiGet(ReadOptions(), keys_must_exist, &values);
for (size_t i = 0; i < keys_must_exist.size(); i++) {
ASSERT_OK(status_list[i]);
}
}
if (keys_must_not_exist.size() > 0) {
std::vector<Status> status_list =
db->MultiGet(ReadOptions(), keys_must_not_exist, &values);
for (size_t i = 0; i < keys_must_not_exist.size(); i++) {
ASSERT_TRUE(status_list[i].IsNotFound());
}
}
}
}
TEST_F(DBTest2, WalFilterTest) {
class TestWalFilter : public WalFilter {
private:
WalFilter::WalProcessingOption wal_processing_option_;
size_t apply_option_at_record_index_;
size_t current_record_index_;
public:
TestWalFilter(WalFilter::WalProcessingOption wal_processing_option,
size_t apply_option_for_record_index)
: wal_processing_option_(wal_processing_option),
apply_option_at_record_index_(apply_option_for_record_index),
current_record_index_(0) {}
WalProcessingOption LogRecord(const WriteBatch& ,
WriteBatch* ,
bool* ) const override {
WalFilter::WalProcessingOption option_to_return;
if (current_record_index_ == apply_option_at_record_index_) {
option_to_return = wal_processing_option_;
} else {
option_to_return = WalProcessingOption::kContinueProcessing;
}
(const_cast<TestWalFilter*>(this)->current_record_index_)++;
return option_to_return;
}
const char* Name() const override { return "TestWalFilter"; }
};
std::vector<std::vector<std::string>> batch_keys(3);
batch_keys[0].push_back("key1");
batch_keys[0].push_back("key2");
batch_keys[1].push_back("key3");
batch_keys[1].push_back("key4");
batch_keys[2].push_back("key5");
batch_keys[2].push_back("key6");
for (int option = 0;
option < static_cast<int>(
WalFilter::WalProcessingOption::kWalProcessingOptionMax);
option++) {
Options options = OptionsForLogIterTest();
DestroyAndReopen(options);
CreateAndReopenWithCF({"pikachu"}, options);
for (size_t i = 0; i < batch_keys.size(); i++) {
WriteBatch batch;
for (size_t j = 0; j < batch_keys[i].size(); j++) {
ASSERT_OK(batch.Put(handles_[0], batch_keys[i][j], DummyString(1024)));
}
ASSERT_OK(dbfull()->Write(WriteOptions(), &batch));
}
WalFilter::WalProcessingOption wal_processing_option =
static_cast<WalFilter::WalProcessingOption>(option);
size_t apply_option_for_record_index = 1;
TestWalFilter test_wal_filter(wal_processing_option,
apply_option_for_record_index);
options = OptionsForLogIterTest();
options.wal_filter = &test_wal_filter;
Status status =
TryReopenWithColumnFamilies({"default", "pikachu"}, options);
if (wal_processing_option ==
WalFilter::WalProcessingOption::kCorruptedRecord) {
ASSERT_NOK(status);
options.paranoid_checks = false;
ReopenWithColumnFamilies({"default", "pikachu"}, options);
} else {
ASSERT_OK(status);
}
std::vector<Slice> keys_must_exist;
std::vector<Slice> keys_must_not_exist;
switch (wal_processing_option) {
case WalFilter::WalProcessingOption::kCorruptedRecord:
case WalFilter::WalProcessingOption::kContinueProcessing: {
fprintf(stderr, "Testing with complete WAL processing\n");
for (size_t i = 0; i < batch_keys.size(); i++) {
for (size_t j = 0; j < batch_keys[i].size(); j++) {
keys_must_exist.emplace_back(batch_keys[i][j]);
}
}
break;
}
case WalFilter::WalProcessingOption::kIgnoreCurrentRecord: {
fprintf(stderr,
"Testing with ignoring record %" ROCKSDB_PRIszt " only\n",
apply_option_for_record_index);
for (size_t i = 0; i < batch_keys.size(); i++) {
for (size_t j = 0; j < batch_keys[i].size(); j++) {
if (i == apply_option_for_record_index) {
keys_must_not_exist.emplace_back(batch_keys[i][j]);
} else {
keys_must_exist.emplace_back(batch_keys[i][j]);
}
}
}
break;
}
case WalFilter::WalProcessingOption::kStopReplay: {
fprintf(stderr,
"Testing with stopping replay from record %" ROCKSDB_PRIszt
"\n",
apply_option_for_record_index);
for (size_t i = 0; i < batch_keys.size(); i++) {
for (size_t j = 0; j < batch_keys[i].size(); j++) {
if (i >= apply_option_for_record_index) {
keys_must_not_exist.emplace_back(batch_keys[i][j]);
} else {
keys_must_exist.emplace_back(batch_keys[i][j]);
}
}
}
break;
}
default:
FAIL(); }
bool checked_after_reopen = false;
while (true) {
ValidateKeyExistence(db_.get(), keys_must_exist, keys_must_not_exist);
if (checked_after_reopen) {
break;
}
options = OptionsForLogIterTest();
ReopenWithColumnFamilies({"default", "pikachu"}, options);
checked_after_reopen = true;
}
}
}
TEST_F(DBTest2, WalFilterTestWithChangeBatch) {
class ChangeBatchHandler : public WriteBatch::Handler {
private:
WriteBatch* new_write_batch_;
size_t num_keys_to_add_in_new_batch_;
size_t num_keys_added_;
public:
ChangeBatchHandler(WriteBatch* new_write_batch,
size_t num_keys_to_add_in_new_batch)
: new_write_batch_(new_write_batch),
num_keys_to_add_in_new_batch_(num_keys_to_add_in_new_batch),
num_keys_added_(0) {}
void Put(const Slice& key, const Slice& value) override {
if (num_keys_added_ < num_keys_to_add_in_new_batch_) {
ASSERT_OK(new_write_batch_->Put(key, value));
++num_keys_added_;
}
}
};
class TestWalFilterWithChangeBatch : public WalFilter {
private:
size_t change_records_from_index_;
size_t num_keys_to_add_in_new_batch_;
size_t current_record_index_;
public:
TestWalFilterWithChangeBatch(size_t change_records_from_index,
size_t num_keys_to_add_in_new_batch)
: change_records_from_index_(change_records_from_index),
num_keys_to_add_in_new_batch_(num_keys_to_add_in_new_batch),
current_record_index_(0) {}
WalProcessingOption LogRecord(const WriteBatch& batch,
WriteBatch* new_batch,
bool* batch_changed) const override {
if (current_record_index_ >= change_records_from_index_) {
ChangeBatchHandler handler(new_batch, num_keys_to_add_in_new_batch_);
Status s = batch.Iterate(&handler);
if (s.ok()) {
*batch_changed = true;
} else {
assert(false);
}
}
(const_cast<TestWalFilterWithChangeBatch*>(this)
->current_record_index_)++;
return WalProcessingOption::kContinueProcessing;
}
const char* Name() const override { return "TestWalFilterWithChangeBatch"; }
};
std::vector<std::vector<std::string>> batch_keys(3);
batch_keys[0].push_back("key1");
batch_keys[0].push_back("key2");
batch_keys[1].push_back("key3");
batch_keys[1].push_back("key4");
batch_keys[2].push_back("key5");
batch_keys[2].push_back("key6");
Options options = OptionsForLogIterTest();
DestroyAndReopen(options);
CreateAndReopenWithCF({"pikachu"}, options);
for (size_t i = 0; i < batch_keys.size(); i++) {
WriteBatch batch;
for (size_t j = 0; j < batch_keys[i].size(); j++) {
ASSERT_OK(batch.Put(handles_[0], batch_keys[i][j], DummyString(1024)));
}
ASSERT_OK(dbfull()->Write(WriteOptions(), &batch));
}
size_t change_records_from_index = 1;
size_t num_keys_to_add_in_new_batch = 1;
TestWalFilterWithChangeBatch test_wal_filter_with_change_batch(
change_records_from_index, num_keys_to_add_in_new_batch);
options = OptionsForLogIterTest();
options.wal_filter = &test_wal_filter_with_change_batch;
ReopenWithColumnFamilies({"default", "pikachu"}, options);
std::vector<Slice> keys_must_exist;
std::vector<Slice> keys_must_not_exist;
for (size_t i = 0; i < batch_keys.size(); i++) {
for (size_t j = 0; j < batch_keys[i].size(); j++) {
if (i >= change_records_from_index && j >= num_keys_to_add_in_new_batch) {
keys_must_not_exist.emplace_back(batch_keys[i][j]);
} else {
keys_must_exist.emplace_back(batch_keys[i][j]);
}
}
}
bool checked_after_reopen = false;
while (true) {
ValidateKeyExistence(db_.get(), keys_must_exist, keys_must_not_exist);
if (checked_after_reopen) {
break;
}
options = OptionsForLogIterTest();
ReopenWithColumnFamilies({"default", "pikachu"}, options);
checked_after_reopen = true;
}
}
TEST_F(DBTest2, WalFilterTestWithChangeBatchExtraKeys) {
class TestWalFilterWithChangeBatchAddExtraKeys : public WalFilter {
public:
WalProcessingOption LogRecord(const WriteBatch& batch,
WriteBatch* new_batch,
bool* batch_changed) const override {
*new_batch = batch;
Status s = new_batch->Put("key_extra", "value_extra");
if (s.ok()) {
*batch_changed = true;
} else {
assert(false);
}
return WalProcessingOption::kContinueProcessing;
}
const char* Name() const override {
return "WalFilterTestWithChangeBatchExtraKeys";
}
};
std::vector<std::vector<std::string>> batch_keys(3);
batch_keys[0].push_back("key1");
batch_keys[0].push_back("key2");
batch_keys[1].push_back("key3");
batch_keys[1].push_back("key4");
batch_keys[2].push_back("key5");
batch_keys[2].push_back("key6");
Options options = OptionsForLogIterTest();
DestroyAndReopen(options);
CreateAndReopenWithCF({"pikachu"}, options);
for (size_t i = 0; i < batch_keys.size(); i++) {
WriteBatch batch;
for (size_t j = 0; j < batch_keys[i].size(); j++) {
ASSERT_OK(batch.Put(handles_[0], batch_keys[i][j], DummyString(1024)));
}
ASSERT_OK(dbfull()->Write(WriteOptions(), &batch));
}
TestWalFilterWithChangeBatchAddExtraKeys test_wal_filter_extra_keys;
options = OptionsForLogIterTest();
options.wal_filter = &test_wal_filter_extra_keys;
Status status = TryReopenWithColumnFamilies({"default", "pikachu"}, options);
ASSERT_TRUE(status.IsNotSupported());
options = OptionsForLogIterTest();
ReopenWithColumnFamilies({"default", "pikachu"}, options);
std::vector<Slice> keys_must_exist;
std::vector<Slice> keys_must_not_exist;
for (size_t i = 0; i < batch_keys.size(); i++) {
for (size_t j = 0; j < batch_keys[i].size(); j++) {
keys_must_exist.emplace_back(batch_keys[i][j]);
}
}
ValidateKeyExistence(db_.get(), keys_must_exist, keys_must_not_exist);
}
TEST_F(DBTest2, WalFilterTestWithColumnFamilies) {
class TestWalFilterWithColumnFamilies : public WalFilter {
private:
std::map<uint32_t, uint64_t> cf_log_number_map_;
std::map<std::string, uint32_t> cf_name_id_map_;
std::map<uint32_t, std::vector<std::string>> cf_wal_keys_;
public:
void ColumnFamilyLogNumberMap(
const std::map<uint32_t, uint64_t>& cf_lognumber_map,
const std::map<std::string, uint32_t>& cf_name_id_map) override {
cf_log_number_map_ = cf_lognumber_map;
cf_name_id_map_ = cf_name_id_map;
}
WalProcessingOption LogRecordFound(unsigned long long log_number,
const std::string& ,
const WriteBatch& batch,
WriteBatch* ,
bool* ) override {
class LogRecordBatchHandler : public WriteBatch::Handler {
private:
const std::map<uint32_t, uint64_t>& cf_log_number_map_;
std::map<uint32_t, std::vector<std::string>>& cf_wal_keys_;
unsigned long long log_number_;
public:
LogRecordBatchHandler(
unsigned long long current_log_number,
const std::map<uint32_t, uint64_t>& cf_log_number_map,
std::map<uint32_t, std::vector<std::string>>& cf_wal_keys)
: cf_log_number_map_(cf_log_number_map),
cf_wal_keys_(cf_wal_keys),
log_number_(current_log_number) {}
Status PutCF(uint32_t column_family_id, const Slice& key,
const Slice& ) override {
auto it = cf_log_number_map_.find(column_family_id);
assert(it != cf_log_number_map_.end());
unsigned long long log_number_for_cf = it->second;
if (log_number_ >= log_number_for_cf) {
cf_wal_keys_[column_family_id].push_back(
std::string(key.data(), key.size()));
}
return Status::OK();
}
} handler(log_number, cf_log_number_map_, cf_wal_keys_);
Status s = batch.Iterate(&handler);
if (!s.ok()) {
return WalProcessingOption::kCorruptedRecord;
}
return WalProcessingOption::kContinueProcessing;
}
const char* Name() const override {
return "WalFilterTestWithColumnFamilies";
}
const std::map<uint32_t, std::vector<std::string>>& GetColumnFamilyKeys() {
return cf_wal_keys_;
}
const std::map<std::string, uint32_t>& GetColumnFamilyNameIdMap() {
return cf_name_id_map_;
}
};
std::vector<std::vector<std::string>> batch_keys_pre_flush(3);
batch_keys_pre_flush[0].push_back("key1");
batch_keys_pre_flush[0].push_back("key2");
batch_keys_pre_flush[1].push_back("key3");
batch_keys_pre_flush[1].push_back("key4");
batch_keys_pre_flush[2].push_back("key5");
batch_keys_pre_flush[2].push_back("key6");
Options options = OptionsForLogIterTest();
DestroyAndReopen(options);
CreateAndReopenWithCF({"pikachu"}, options);
for (size_t i = 0; i < batch_keys_pre_flush.size(); i++) {
WriteBatch batch;
for (size_t j = 0; j < batch_keys_pre_flush[i].size(); j++) {
ASSERT_OK(batch.Put(handles_[0], batch_keys_pre_flush[i][j],
DummyString(1024)));
ASSERT_OK(batch.Put(handles_[1], batch_keys_pre_flush[i][j],
DummyString(1024)));
}
ASSERT_OK(dbfull()->Write(WriteOptions(), &batch));
}
ASSERT_OK(db_->Flush(FlushOptions(), handles_[0]));
std::vector<std::vector<std::string>> batch_keys_post_flush(3);
batch_keys_post_flush[0].push_back("key7");
batch_keys_post_flush[0].push_back("key8");
batch_keys_post_flush[1].push_back("key9");
batch_keys_post_flush[1].push_back("key10");
batch_keys_post_flush[2].push_back("key11");
batch_keys_post_flush[2].push_back("key12");
for (size_t i = 0; i < batch_keys_post_flush.size(); i++) {
WriteBatch batch;
for (size_t j = 0; j < batch_keys_post_flush[i].size(); j++) {
ASSERT_OK(batch.Put(handles_[0], batch_keys_post_flush[i][j],
DummyString(1024)));
ASSERT_OK(batch.Put(handles_[1], batch_keys_post_flush[i][j],
DummyString(1024)));
}
ASSERT_OK(dbfull()->Write(WriteOptions(), &batch));
}
TestWalFilterWithColumnFamilies test_wal_filter_column_families;
options = OptionsForLogIterTest();
options.wal_filter = &test_wal_filter_column_families;
Status status = TryReopenWithColumnFamilies({"default", "pikachu"}, options);
ASSERT_TRUE(status.ok());
auto cf_wal_keys = test_wal_filter_column_families.GetColumnFamilyKeys();
auto name_id_map = test_wal_filter_column_families.GetColumnFamilyNameIdMap();
size_t index = 0;
auto keys_cf = cf_wal_keys[name_id_map[kDefaultColumnFamilyName]];
for (size_t i = 0; i < batch_keys_post_flush.size(); i++) {
for (size_t j = 0; j < batch_keys_post_flush[i].size(); j++) {
Slice key_from_the_log(keys_cf[index++]);
Slice batch_key(batch_keys_post_flush[i][j]);
ASSERT_EQ(key_from_the_log.compare(batch_key), 0);
}
}
ASSERT_EQ(index, keys_cf.size());
index = 0;
keys_cf = cf_wal_keys[name_id_map["pikachu"]];
for (size_t i = 0; i < batch_keys_pre_flush.size(); i++) {
for (size_t j = 0; j < batch_keys_pre_flush[i].size(); j++) {
Slice key_from_the_log(keys_cf[index++]);
Slice batch_key(batch_keys_pre_flush[i][j]);
ASSERT_EQ(key_from_the_log.compare(batch_key), 0);
}
}
for (size_t i = 0; i < batch_keys_post_flush.size(); i++) {
for (size_t j = 0; j < batch_keys_post_flush[i].size(); j++) {
Slice key_from_the_log(keys_cf[index++]);
Slice batch_key(batch_keys_post_flush[i][j]);
ASSERT_EQ(key_from_the_log.compare(batch_key), 0);
}
}
ASSERT_EQ(index, keys_cf.size());
}
class CompactionStallTestListener : public EventListener {
public:
CompactionStallTestListener()
: compacting_files_cnt_(0), compacted_files_cnt_(0) {}
void OnCompactionBegin(DB* , const CompactionJobInfo& ci) override {
ASSERT_EQ(ci.cf_name, "default");
ASSERT_EQ(ci.base_input_level, 0);
ASSERT_EQ(ci.compaction_reason, CompactionReason::kLevelL0FilesNum);
compacting_files_cnt_ += ci.input_files.size();
}
void OnCompactionCompleted(DB* , const CompactionJobInfo& ci) override {
ASSERT_EQ(ci.cf_name, "default");
ASSERT_EQ(ci.base_input_level, 0);
ASSERT_EQ(ci.compaction_reason, CompactionReason::kLevelL0FilesNum);
compacted_files_cnt_ += ci.input_files.size();
}
std::atomic<size_t> compacting_files_cnt_;
std::atomic<size_t> compacted_files_cnt_;
};
TEST_F(DBTest2, CompactionStall) {
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::BGWorkCompaction", "DBTest2::CompactionStall:0"},
{"DBImpl::BGWorkCompaction", "DBTest2::CompactionStall:1"},
{"DBTest2::CompactionStall:2",
"DBImpl::NotifyOnCompactionBegin::UnlockMutex"},
{"DBTest2::CompactionStall:3",
"DBImpl::NotifyOnCompactionCompleted::UnlockMutex"}});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
Options options = CurrentOptions();
options.level0_file_num_compaction_trigger = 4;
options.max_background_compactions = 40;
CompactionStallTestListener* listener = new CompactionStallTestListener();
options.listeners.emplace_back(listener);
DestroyAndReopen(options);
auto stop_token =
dbfull()->TEST_write_controler().GetCompactionPressureToken();
Random rnd(301);
for (int i = 0; i < 4; i++) {
for (int j = 0; j < 10; j++) {
ASSERT_OK(Put(rnd.RandomString(10), rnd.RandomString(10)));
}
ASSERT_OK(Flush());
}
TEST_SYNC_POINT("DBTest2::CompactionStall:0");
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearTrace();
for (int i = 0; i < 6; i++) {
for (int j = 0; j < 10; j++) {
ASSERT_OK(Put(rnd.RandomString(10), rnd.RandomString(10)));
}
ASSERT_OK(Flush());
}
TEST_SYNC_POINT("DBTest2::CompactionStall:1");
TEST_SYNC_POINT("DBTest2::CompactionStall:2");
TEST_SYNC_POINT("DBTest2::CompactionStall:3");
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_LT(NumTableFilesAtLevel(0),
options.level0_file_num_compaction_trigger);
ASSERT_GT(listener->compacted_files_cnt_.load(),
10 - options.level0_file_num_compaction_trigger);
ASSERT_EQ(listener->compacting_files_cnt_.load(),
listener->compacted_files_cnt_.load());
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(DBTest2, FirstSnapshotTest) {
Options options;
options.write_buffer_size = 100000; options = CurrentOptions(options);
CreateAndReopenWithCF({"pikachu"}, options);
const Snapshot* s1 = db_->GetSnapshot();
ASSERT_OK(Put(1, "k1", std::string(100000, 'x'))); ASSERT_OK(Put(1, "k2", std::string(100000, 'y')));
db_->ReleaseSnapshot(s1);
}
TEST_F(DBTest2, DuplicateSnapshot) {
Options options;
options = CurrentOptions(options);
std::vector<const Snapshot*> snapshots;
DBImpl* dbi = dbfull();
SequenceNumber oldest_ww_snap, first_ww_snap;
ASSERT_OK(Put("k", "v")); snapshots.push_back(db_->GetSnapshot());
snapshots.push_back(db_->GetSnapshot());
ASSERT_OK(Put("k", "v")); snapshots.push_back(db_->GetSnapshot());
snapshots.push_back(dbi->GetSnapshotForWriteConflictBoundary());
first_ww_snap = snapshots.back()->GetSequenceNumber();
ASSERT_OK(Put("k", "v")); snapshots.push_back(dbi->GetSnapshotForWriteConflictBoundary());
snapshots.push_back(db_->GetSnapshot());
ASSERT_OK(Put("k", "v")); snapshots.push_back(db_->GetSnapshot());
{
InstrumentedMutexLock l(dbi->mutex());
auto seqs = dbi->snapshots().GetAll(&oldest_ww_snap);
ASSERT_EQ(seqs.size(), 4); ASSERT_EQ(oldest_ww_snap, first_ww_snap);
}
for (auto s : snapshots) {
db_->ReleaseSnapshot(s);
}
}
class PinL0IndexAndFilterBlocksTest
: public DBTestBase,
public testing::WithParamInterface<std::tuple<bool, bool>> {
public:
PinL0IndexAndFilterBlocksTest()
: DBTestBase("db_pin_l0_index_bloom_test", true) {}
void SetUp() override {
infinite_max_files_ = std::get<0>(GetParam());
disallow_preload_ = std::get<1>(GetParam());
}
void CreateTwoLevels(Options* options, bool close_afterwards) {
if (infinite_max_files_) {
options->max_open_files = -1;
}
options->create_if_missing = true;
options->statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
BlockBasedTableOptions table_options;
table_options.cache_index_and_filter_blocks = true;
table_options.pin_l0_filter_and_index_blocks_in_cache = true;
table_options.filter_policy.reset(NewBloomFilterPolicy(20));
options->table_factory.reset(NewBlockBasedTableFactory(table_options));
CreateAndReopenWithCF({"pikachu"}, *options);
ASSERT_OK(Put(1, "a", "begin"));
ASSERT_OK(Put(1, "z", "end"));
ASSERT_OK(Flush(1));
ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]));
ASSERT_EQ(1, NumTableFilesAtLevel(1, 1));
table_options.block_cache = NewLRUCache(64 * 1024);
options->table_factory.reset(NewBlockBasedTableFactory(table_options));
ASSERT_OK(TryReopenWithColumnFamilies({"default", "pikachu"}, *options));
ASSERT_OK(Put(1, "a2", "begin2"));
ASSERT_OK(Put(1, "z2", "end2"));
ASSERT_OK(Flush(1));
if (close_afterwards) {
Close(); }
table_options.block_cache->EraseUnRefEntries();
}
bool infinite_max_files_;
bool disallow_preload_;
};
TEST_P(PinL0IndexAndFilterBlocksTest,
IndexAndFilterBlocksOfNewTableAddedToCacheWithPinning) {
Options options = CurrentOptions();
if (infinite_max_files_) {
options.max_open_files = -1;
}
options.create_if_missing = true;
options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
BlockBasedTableOptions table_options;
table_options.cache_index_and_filter_blocks = true;
table_options.pin_l0_filter_and_index_blocks_in_cache = true;
table_options.filter_policy.reset(NewBloomFilterPolicy(20));
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
CreateAndReopenWithCF({"pikachu"}, options);
ASSERT_OK(Put(1, "key", "val"));
ASSERT_OK(Flush(1));
ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
ASSERT_EQ(2, TestGetTickerCount(options, BLOCK_CACHE_ADD));
ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_DATA_MISS));
ASSERT_TRUE(db_->KeyMayExist(ReadOptions(), handles_[1], "key", nullptr));
ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
std::string value = Get(1, "key");
ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
}
TEST_P(PinL0IndexAndFilterBlocksTest,
MultiLevelIndexAndFilterBlocksCachedWithPinning) {
Options options = CurrentOptions();
PinL0IndexAndFilterBlocksTest::CreateTwoLevels(&options, false);
uint64_t fm = TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS);
uint64_t fh = TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT);
uint64_t im = TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS);
uint64_t ih = TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT);
std::string value;
value = Get(1, "a2");
ASSERT_EQ(fm, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
ASSERT_EQ(fh, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
ASSERT_EQ(im, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
ASSERT_EQ(ih, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
value = Get(1, "a");
ASSERT_EQ(fm + 1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
ASSERT_EQ(im + 1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
}
TEST_P(PinL0IndexAndFilterBlocksTest, DisablePrefetchingNonL0IndexAndFilter) {
Options options = CurrentOptions();
bool close_afterwards = true;
PinL0IndexAndFilterBlocksTest::CreateTwoLevels(&options, close_afterwards);
uint64_t fm = TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS);
uint64_t fh = TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT);
uint64_t im = TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS);
uint64_t ih = TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT);
if (disallow_preload_) {
options.max_open_files = 13;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"SanitizeOptions::AfterChangeMaxOpenFiles", [&](void* arg) {
int* max_open_files = static_cast<int*>(arg);
*max_open_files = 13;
});
}
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(TryReopenWithColumnFamilies({"default", "pikachu"}, options));
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
if (!disallow_preload_) {
ASSERT_EQ(fm + 1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
ASSERT_EQ(fh, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
ASSERT_EQ(im + 1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
ASSERT_EQ(ih, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
} else {
ASSERT_EQ(fm, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
ASSERT_EQ(fh, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
ASSERT_EQ(im, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
ASSERT_EQ(ih, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
}
std::string value;
value = Get(1, "a2");
ASSERT_EQ(fm + 1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
ASSERT_EQ(fh, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
ASSERT_EQ(im + 1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
ASSERT_EQ(ih, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
value = Get(1, "a");
if (!disallow_preload_) {
ASSERT_EQ(fm + 2, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
ASSERT_EQ(fh, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
ASSERT_EQ(im + 2, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
ASSERT_EQ(ih, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
} else {
ASSERT_EQ(fm + 2, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
ASSERT_EQ(fh + 1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
ASSERT_EQ(im + 2, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
ASSERT_EQ(ih + 1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
}
Compact(1, "a", "zzzzz");
ASSERT_OK(dbfull()->TEST_WaitForCompact());
if (!disallow_preload_) {
ASSERT_EQ(fm + 3, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
ASSERT_EQ(fh, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
ASSERT_EQ(im + 3, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
ASSERT_EQ(ih + 2, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
} else {
ASSERT_EQ(fm + 3, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
ASSERT_EQ(fh + 1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
ASSERT_EQ(im + 3, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
ASSERT_EQ(ih + 3, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
}
value = Get(1, "a");
if (!disallow_preload_) {
ASSERT_EQ(fm + 3, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
ASSERT_EQ(fh + 1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
ASSERT_EQ(im + 3, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
ASSERT_EQ(ih + 3, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
} else {
ASSERT_EQ(fm + 3, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
ASSERT_EQ(fh + 2, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
ASSERT_EQ(im + 3, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
ASSERT_EQ(ih + 4, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
}
}
INSTANTIATE_TEST_CASE_P(PinL0IndexAndFilterBlocksTest,
PinL0IndexAndFilterBlocksTest,
::testing::Values(std::make_tuple(true, false),
std::make_tuple(false, false),
std::make_tuple(false, true)));
TEST_F(DBTest2, MaxCompactionBytesTest) {
Options options = CurrentOptions();
options.memtable_factory.reset(test::NewSpecialSkipListFactory(
DBTestBase::kNumKeysByGenerateNewRandomFile));
options.compaction_style = kCompactionStyleLevel;
options.write_buffer_size = 200 << 10;
options.arena_block_size = 4 << 10;
options.level0_file_num_compaction_trigger = 4;
options.num_levels = 4;
options.compression = kNoCompression;
options.max_bytes_for_level_base = 450 << 10;
options.target_file_size_base = 100 << 10;
options.max_compaction_bytes = options.target_file_size_base * 100;
Reopen(options);
Random rnd(301);
for (int num = 0; num < 8; num++) {
GenerateNewRandomFile(&rnd);
}
CompactRangeOptions cro;
cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
ASSERT_EQ("0,0,8", FilesPerLevel(0));
options.max_compaction_bytes = options.target_file_size_base * 3;
Reopen(options);
GenerateNewRandomFile(&rnd);
for (int i = 0; i < 3; i++) {
ASSERT_OK(Put("a", "z"));
ASSERT_OK(Flush());
}
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ("0,4,8", FilesPerLevel(0));
}
static void UniqueIdCallback(void* arg) {
int* result = static_cast<int*>(arg);
if (*result == -1) {
*result = 0;
}
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearTrace();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"GetUniqueIdFromFile:FS_IOC_GETVERSION", UniqueIdCallback);
}
class MockPersistentCache : public PersistentCache {
public:
explicit MockPersistentCache(const bool is_compressed, const size_t max_size)
: is_compressed_(is_compressed), max_size_(max_size) {
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"GetUniqueIdFromFile:FS_IOC_GETVERSION", UniqueIdCallback);
}
~MockPersistentCache() override = default;
PersistentCache::StatsType Stats() override {
return PersistentCache::StatsType();
}
uint64_t NewId() override {
return last_id_.fetch_add(1, std::memory_order_relaxed);
}
Status Insert(const Slice& page_key, const char* data,
const size_t size) override {
MutexLock _(&lock_);
if (size_ > max_size_) {
size_ -= data_.begin()->second.size();
data_.erase(data_.begin());
}
data_.insert(std::make_pair(page_key.ToString(), std::string(data, size)));
size_ += size;
return Status::OK();
}
Status Lookup(const Slice& page_key, std::unique_ptr<char[]>* data,
size_t* size) override {
MutexLock _(&lock_);
auto it = data_.find(page_key.ToString());
if (it == data_.end()) {
return Status::NotFound();
}
assert(page_key.ToString() == it->first);
data->reset(new char[it->second.size()]);
memcpy(data->get(), it->second.c_str(), it->second.size());
*size = it->second.size();
return Status::OK();
}
bool IsCompressed() override { return is_compressed_; }
std::string GetPrintableOptions() const override {
return "MockPersistentCache";
}
port::Mutex lock_;
std::map<std::string, std::string> data_;
const bool is_compressed_ = true;
size_t size_ = 0;
const size_t max_size_ = 10 * 1024; std::atomic<uint64_t> last_id_{1};
};
#ifdef OS_LINUX
TEST_F(DBTest2, TestPerfContextGetCpuTime) {
dbfull()->TEST_table_cache()->SetCapacity(0);
ASSERT_OK(Put("foo", "bar"));
ASSERT_OK(Flush());
env_->now_cpu_count_.store(0);
env_->SetMockSleep();
SetPerfLevel(PerfLevel::kEnableTimeExceptForMutex);
ASSERT_EQ("bar", Get("foo"));
ASSERT_EQ(0, get_perf_context()->get_cpu_nanos);
ASSERT_EQ(0, env_->now_cpu_count_.load());
constexpr uint64_t kDummyAddonSeconds = uint64_t{1000000};
constexpr uint64_t kDummyAddonNanos = 1000000000U * kDummyAddonSeconds;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"TableCache::FindTable:0",
[&](void* ) { env_->MockSleepForSeconds(kDummyAddonSeconds); });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
SetPerfLevel(PerfLevel::kEnableTimeAndCPUTimeExceptForMutex);
ASSERT_EQ("bar", Get("foo"));
ASSERT_GT(env_->now_cpu_count_.load(), 2);
ASSERT_LT(get_perf_context()->get_cpu_nanos, kDummyAddonNanos);
ASSERT_GT(get_perf_context()->find_table_nanos, kDummyAddonNanos);
SetPerfLevel(PerfLevel::kDisable);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(DBTest2, TestPerfContextIterCpuTime) {
DestroyAndReopen(CurrentOptions());
dbfull()->TEST_table_cache()->SetCapacity(0);
const size_t kNumEntries = 10;
for (size_t i = 0; i < kNumEntries; ++i) {
ASSERT_OK(Put("k" + std::to_string(i), "v" + std::to_string(i)));
}
ASSERT_OK(Flush());
for (size_t i = 0; i < kNumEntries; ++i) {
ASSERT_EQ("v" + std::to_string(i), Get("k" + std::to_string(i)));
}
std::string last_key = "k" + std::to_string(kNumEntries - 1);
std::string last_value = "v" + std::to_string(kNumEntries - 1);
env_->now_cpu_count_.store(0);
env_->SetMockSleep();
SetPerfLevel(PerfLevel::kEnableTimeExceptForMutex);
Iterator* iter = db_->NewIterator(ReadOptions());
iter->Seek("k0");
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("v0", iter->value().ToString());
iter->SeekForPrev(last_key);
ASSERT_TRUE(iter->Valid());
iter->SeekToLast();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(last_value, iter->value().ToString());
iter->SeekToFirst();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("v0", iter->value().ToString());
ASSERT_EQ(0, get_perf_context()->iter_seek_cpu_nanos);
iter->Next();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("v1", iter->value().ToString());
ASSERT_EQ(0, get_perf_context()->iter_next_cpu_nanos);
iter->Prev();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ("v0", iter->value().ToString());
ASSERT_EQ(0, get_perf_context()->iter_prev_cpu_nanos);
ASSERT_EQ(0, env_->now_cpu_count_.load());
delete iter;
constexpr uint64_t kDummyAddonSeconds = uint64_t{1000000};
constexpr uint64_t kDummyAddonNanos = 1000000000U * kDummyAddonSeconds;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"TableCache::FindTable:0",
[&](void* ) { env_->MockSleepForSeconds(kDummyAddonSeconds); });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
SetPerfLevel(PerfLevel::kEnableTimeAndCPUTimeExceptForMutex);
iter = db_->NewIterator(ReadOptions());
iter->Seek("k0");
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("v0", iter->value().ToString());
iter->SeekForPrev(last_key);
ASSERT_TRUE(iter->Valid());
iter->SeekToLast();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(last_value, iter->value().ToString());
iter->SeekToFirst();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("v0", iter->value().ToString());
ASSERT_GT(get_perf_context()->iter_seek_cpu_nanos, 0);
ASSERT_LT(get_perf_context()->iter_seek_cpu_nanos, kDummyAddonNanos);
iter->Next();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("v1", iter->value().ToString());
ASSERT_GT(get_perf_context()->iter_next_cpu_nanos, 0);
ASSERT_LT(get_perf_context()->iter_next_cpu_nanos, kDummyAddonNanos);
iter->Prev();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ("v0", iter->value().ToString());
ASSERT_GT(get_perf_context()->iter_prev_cpu_nanos, 0);
ASSERT_LT(get_perf_context()->iter_prev_cpu_nanos, kDummyAddonNanos);
ASSERT_GE(env_->now_cpu_count_.load(), 12);
ASSERT_GT(get_perf_context()->find_table_nanos, kDummyAddonNanos);
SetPerfLevel(PerfLevel::kDisable);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
delete iter;
}
#endif
#if !defined OS_SOLARIS
TEST_F(DBTest2, PersistentCache) {
int num_iter = 80;
Options options;
options.write_buffer_size = 64 * 1024; options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
options = CurrentOptions(options);
auto bsizes = { 0, 1 * 1024 * 1024};
auto types = { 1, 0};
for (auto bsize : bsizes) {
for (auto type : types) {
BlockBasedTableOptions table_options;
table_options.persistent_cache.reset(
new MockPersistentCache(type, 10 * 1024));
table_options.no_block_cache = true;
table_options.block_cache = bsize ? NewLRUCache(bsize) : nullptr;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
DestroyAndReopen(options);
CreateAndReopenWithCF({"pikachu"}, options);
Options no_block_cache_opts;
no_block_cache_opts.statistics = options.statistics;
no_block_cache_opts = CurrentOptions(no_block_cache_opts);
BlockBasedTableOptions table_options_no_bc;
table_options_no_bc.no_block_cache = true;
no_block_cache_opts.table_factory.reset(
NewBlockBasedTableFactory(table_options_no_bc));
ReopenWithColumnFamilies(
{"default", "pikachu"},
std::vector<Options>({no_block_cache_opts, options}));
Random rnd(301);
ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
std::vector<std::string> values;
std::string str;
for (int i = 0; i < num_iter; i++) {
if (i % 4 == 0) { str = rnd.RandomString(1000);
}
values.push_back(str);
ASSERT_OK(Put(1, Key(i), values[i]));
}
ASSERT_OK(Flush(1));
for (int i = 0; i < num_iter; i++) {
ASSERT_EQ(Get(1, Key(i)), values[i]);
}
auto hit = options.statistics->getTickerCount(PERSISTENT_CACHE_HIT);
auto miss = options.statistics->getTickerCount(PERSISTENT_CACHE_MISS);
ASSERT_GT(hit, 0);
ASSERT_GT(miss, 0);
}
}
}
#endif
namespace {
void CountSyncPoint() {
TEST_SYNC_POINT_CALLBACK("DBTest2::MarkedPoint", nullptr );
}
}
TEST_F(DBTest2, SyncPointMarker) {
std::atomic<int> sync_point_called(0);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBTest2::MarkedPoint",
[&](void* ) { sync_point_called.fetch_add(1); });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependencyAndMarkers(
{{"DBTest2::SyncPointMarker:Thread1First", "DBTest2::MarkedPoint"}},
{{"DBTest2::SyncPointMarker:Marker", "DBTest2::MarkedPoint"}});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
std::function<void()> func1 = [&]() {
CountSyncPoint();
TEST_SYNC_POINT("DBTest2::SyncPointMarker:Thread1First");
};
std::function<void()> func2 = [&]() {
TEST_SYNC_POINT("DBTest2::SyncPointMarker:Marker");
CountSyncPoint();
};
auto thread1 = port::Thread(func1);
auto thread2 = port::Thread(func2);
thread1.join();
thread2.join();
ASSERT_EQ(sync_point_called.load(), 1);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
size_t GetEncodedEntrySize(size_t key_size, size_t value_size) {
std::string buffer;
PutVarint32(&buffer, static_cast<uint32_t>(0));
PutVarint32(&buffer, static_cast<uint32_t>(key_size));
PutVarint32(&buffer, static_cast<uint32_t>(value_size));
return buffer.size() + key_size + value_size;
}
TEST_F(DBTest2, ReadAmpBitmap) {
Options options = CurrentOptions();
BlockBasedTableOptions bbto;
uint32_t bytes_per_bit[2] = {1, 16};
for (size_t k = 0; k < 2; k++) {
bbto.use_delta_encoding = false;
bbto.block_cache = NewLRUCache(1024 * 1024 * 1024);
bbto.read_amp_bytes_per_bit = bytes_per_bit[k];
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
DestroyAndReopen(options);
const size_t kNumEntries = 10000;
Random rnd(301);
for (size_t i = 0; i < kNumEntries; i++) {
ASSERT_OK(Put(Key(static_cast<int>(i)), rnd.RandomString(100)));
}
ASSERT_OK(Flush());
Close();
Reopen(options);
uint64_t total_useful_bytes = 0;
std::set<int> read_keys;
std::string value;
for (size_t i = 0; i < kNumEntries * 5; i++) {
int key_idx = rnd.Next() % kNumEntries;
std::string key = Key(key_idx);
ASSERT_OK(db_->Get(ReadOptions(), key, &value));
if (read_keys.find(key_idx) == read_keys.end()) {
auto internal_key = InternalKey(key, 0, ValueType::kTypeValue);
total_useful_bytes +=
GetEncodedEntrySize(internal_key.size(), value.size());
read_keys.insert(key_idx);
}
double expected_read_amp =
static_cast<double>(total_useful_bytes) /
options.statistics->getTickerCount(READ_AMP_TOTAL_READ_BYTES);
double read_amp =
static_cast<double>(options.statistics->getTickerCount(
READ_AMP_ESTIMATE_USEFUL_BYTES)) /
options.statistics->getTickerCount(READ_AMP_TOTAL_READ_BYTES);
double error_pct = fabs(expected_read_amp - read_amp) * 100;
EXPECT_LE(error_pct, 2);
}
Iterator* iter = db_->NewIterator(ReadOptions());
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ASSERT_EQ(iter->value().ToString(), Get(iter->key().ToString()));
}
ASSERT_OK(iter->status());
delete iter;
if (k == 0) {
ASSERT_EQ(
options.statistics->getTickerCount(READ_AMP_ESTIMATE_USEFUL_BYTES),
options.statistics->getTickerCount(READ_AMP_TOTAL_READ_BYTES));
} else {
ASSERT_NEAR(
options.statistics->getTickerCount(READ_AMP_ESTIMATE_USEFUL_BYTES) *
1.0f /
options.statistics->getTickerCount(READ_AMP_TOTAL_READ_BYTES),
1, .01);
}
}
}
#ifndef OS_SOLARIS
TEST_F(DBTest2, ReadAmpBitmapLiveInCacheAfterDBClose) {
{
const int kIdBufLen = 100;
char id_buf[kIdBufLen];
Status s = Status::NotSupported();
#ifndef OS_WIN
std::unique_ptr<RandomAccessFile> file;
s = env_->NewRandomAccessFile(dbname_, &file, EnvOptions());
if (s.ok()) {
if (file->GetUniqueId(id_buf, kIdBufLen) == 0) {
return;
}
}
#endif
if (!s.ok()) {
std::unique_ptr<Directory> dir;
ASSERT_OK(env_->NewDirectory(dbname_, &dir));
if (dir->GetUniqueId(id_buf, kIdBufLen) == 0) {
return;
}
}
}
uint32_t bytes_per_bit[2] = {1, 16};
for (size_t k = 0; k < 2; k++) {
std::shared_ptr<Cache> lru_cache = NewLRUCache(1024 * 1024 * 1024);
std::shared_ptr<Statistics> stats = ROCKSDB_NAMESPACE::CreateDBStatistics();
Options options = CurrentOptions();
BlockBasedTableOptions bbto;
bbto.use_delta_encoding = false;
bbto.block_cache = lru_cache;
bbto.read_amp_bytes_per_bit = bytes_per_bit[k];
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
options.statistics = stats;
DestroyAndReopen(options);
const int kNumEntries = 10000;
Random rnd(301);
for (int i = 0; i < kNumEntries; i++) {
ASSERT_OK(Put(Key(i), rnd.RandomString(100)));
}
ASSERT_OK(Flush());
Close();
Reopen(options);
std::set<int> read_keys;
std::string value;
for (int i = 0; i < kNumEntries; i += 2) {
std::string key = Key(i);
ASSERT_OK(db_->Get(ReadOptions(), key, &value));
if (read_keys.find(i) == read_keys.end()) {
auto internal_key = InternalKey(key, 0, ValueType::kTypeValue);
read_keys.insert(i);
}
}
size_t total_useful_bytes_iter1 =
options.statistics->getTickerCount(READ_AMP_ESTIMATE_USEFUL_BYTES);
size_t total_loaded_bytes_iter1 =
options.statistics->getTickerCount(READ_AMP_TOTAL_READ_BYTES);
Close();
std::shared_ptr<Statistics> new_statistics =
ROCKSDB_NAMESPACE::CreateDBStatistics();
options.statistics.reset();
options.statistics = new_statistics;
Reopen(options);
for (int i = 1; i < kNumEntries; i += 2) {
std::string key = Key(i);
ASSERT_OK(db_->Get(ReadOptions(), key, &value));
if (read_keys.find(i) == read_keys.end()) {
auto internal_key = InternalKey(key, 0, ValueType::kTypeValue);
read_keys.insert(i);
}
}
size_t total_useful_bytes_iter2 =
options.statistics->getTickerCount(READ_AMP_ESTIMATE_USEFUL_BYTES);
size_t total_loaded_bytes_iter2 =
options.statistics->getTickerCount(READ_AMP_TOTAL_READ_BYTES);
if (k == 0) {
ASSERT_EQ(total_useful_bytes_iter1 + total_useful_bytes_iter2,
total_loaded_bytes_iter1 + total_loaded_bytes_iter2);
} else {
ASSERT_NEAR((total_useful_bytes_iter1 + total_useful_bytes_iter2) * 1.0f /
(total_loaded_bytes_iter1 + total_loaded_bytes_iter2),
1, .01);
}
}
}
#endif
TEST_F(DBTest2, AutomaticCompactionOverlapManualCompaction) {
Options options = CurrentOptions();
options.num_levels = 3;
options.IncreaseParallelism(20);
DestroyAndReopen(options);
ASSERT_OK(Put(Key(0), "a"));
ASSERT_OK(Put(Key(5), "a"));
ASSERT_OK(Flush());
ASSERT_OK(Put(Key(10), "a"));
ASSERT_OK(Put(Key(15), "a"));
ASSERT_OK(Flush());
CompactRangeOptions cro;
cro.change_level = true;
cro.target_level = 2;
ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
auto get_stat = [](std::string level_str, LevelStatType type,
std::map<std::string, std::string> props) {
auto prop_str =
"compaction." + level_str + "." +
InternalStats::compaction_level_stats.at(type).property_name.c_str();
auto prop_item = props.find(prop_str);
return prop_item == props.end() ? 0 : std::stod(prop_item->second);
};
ASSERT_EQ("0,0,2", FilesPerLevel());
{
std::map<std::string, std::string> prop;
ASSERT_TRUE(dbfull()->GetMapProperty("rocksdb.cfstats", &prop));
ASSERT_EQ(0, get_stat("L0", LevelStatType::NUM_FILES, prop));
ASSERT_EQ(0, get_stat("L1", LevelStatType::NUM_FILES, prop));
ASSERT_EQ(2, get_stat("L2", LevelStatType::NUM_FILES, prop));
ASSERT_EQ(2, get_stat("Sum", LevelStatType::NUM_FILES, prop));
}
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"CompactionJob::Run():Start", [&](void* ) {
ASSERT_OK(
dbfull()->SetOptions({{"level0_file_num_compaction_trigger", "2"},
{"max_bytes_for_level_base", "1"}}));
ASSERT_OK(Put(Key(6), "a"));
ASSERT_OK(Put(Key(7), "a"));
ASSERT_OK(Flush());
ASSERT_OK(Put(Key(8), "a"));
ASSERT_OK(Put(Key(9), "a"));
ASSERT_OK(Flush());
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
cro.exclusive_manual_compaction = false;
cro.bottommost_level_compaction = BottommostLevelCompaction::kForceOptimized;
ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
{
std::map<std::string, std::string> prop;
ASSERT_TRUE(dbfull()->GetMapProperty("rocksdb.cfstats", &prop));
ASSERT_EQ(1, get_stat("L2", LevelStatType::NUM_FILES, prop));
}
}
TEST_F(DBTest2, ManualCompactionOverlapManualCompaction) {
Options options = CurrentOptions();
options.num_levels = 2;
options.IncreaseParallelism(20);
options.disable_auto_compactions = true;
DestroyAndReopen(options);
ASSERT_OK(Put(Key(0), "a"));
ASSERT_OK(Put(Key(5), "a"));
ASSERT_OK(Flush());
ASSERT_OK(Put(Key(10), "a"));
ASSERT_OK(Put(Key(15), "a"));
ASSERT_OK(Flush());
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_EQ("0,2", FilesPerLevel());
std::function<void()> bg_manual_compact = [&]() {
std::string k1 = Key(6);
std::string k2 = Key(9);
Slice k1s(k1);
Slice k2s(k2);
CompactRangeOptions cro;
cro.exclusive_manual_compaction = false;
ASSERT_OK(db_->CompactRange(cro, &k1s, &k2s));
};
ROCKSDB_NAMESPACE::port::Thread bg_thread;
std::atomic<bool> flag(false);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"CompactionJob::Run():Start", [&](void* ) {
if (flag.exchange(true)) {
return;
}
ASSERT_OK(Put(Key(6), "a"));
ASSERT_OK(Put(Key(7), "a"));
ASSERT_OK(Flush());
ASSERT_OK(Put(Key(8), "a"));
ASSERT_OK(Put(Key(9), "a"));
ASSERT_OK(Flush());
bg_thread = port::Thread(bg_manual_compact);
env_->SleepForMicroseconds(1000000);
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
CompactRangeOptions cro;
cro.exclusive_manual_compaction = false;
cro.bottommost_level_compaction = BottommostLevelCompaction::kForceOptimized;
ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
bg_thread.join();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(DBTest2, PausingManualCompaction1) {
Options options = CurrentOptions();
options.disable_auto_compactions = true;
options.num_levels = 7;
DestroyAndReopen(options);
Random rnd(301);
for (int i = 0; i < 10; i++) {
ASSERT_OK(Put(Key(i), rnd.RandomString(50)));
}
ASSERT_OK(Flush());
for (int i = 0; i < 10; i++) {
ASSERT_OK(Put(Key(i), rnd.RandomString(50)));
}
ASSERT_OK(Flush());
int manual_compactions_paused = 0;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"CompactionJob::Run():PausingManualCompaction:1", [&](void* arg) {
auto canceled = static_cast<std::atomic<bool>*>(arg);
if (canceled != nullptr) {
canceled->store(true, std::memory_order_release);
}
manual_compactions_paused += 1;
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"TestCompactFiles:PausingManualCompaction:3", [&](void* arg) {
auto paused = static_cast<std::atomic<int>*>(arg);
ASSERT_EQ(0, paused->load(std::memory_order_acquire));
paused->fetch_add(1, std::memory_order_release);
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
std::vector<std::string> files_before_compact, files_after_compact;
std::vector<LiveFileMetaData> files_meta;
dbfull()->GetLiveFilesMetaData(&files_meta);
for (const auto& file : files_meta) {
files_before_compact.push_back(file.name);
}
ASSERT_TRUE(dbfull()
->CompactRange(CompactRangeOptions(), nullptr, nullptr)
.IsManualCompactionPaused());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
files_meta.clear();
dbfull()->GetLiveFilesMetaData(&files_meta);
for (const auto& file : files_meta) {
files_after_compact.push_back(file.name);
}
ASSERT_EQ(files_before_compact, files_after_compact);
ASSERT_EQ(manual_compactions_paused, 1);
manual_compactions_paused = 0;
ASSERT_TRUE(dbfull()
->CompactFiles(ROCKSDB_NAMESPACE::CompactionOptions(),
files_before_compact, 0)
.IsManualCompactionPaused());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
files_meta.clear();
files_after_compact.clear();
dbfull()->GetLiveFilesMetaData(&files_meta);
for (const auto& file : files_meta) {
files_after_compact.push_back(file.name);
}
ASSERT_EQ(files_before_compact, files_after_compact);
ASSERT_EQ(manual_compactions_paused, 0);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(DBTest2, PausingManualCompaction2) {
Options options = CurrentOptions();
options.level0_file_num_compaction_trigger = 2;
options.disable_auto_compactions = false;
DestroyAndReopen(options);
dbfull()->DisableManualCompaction();
Random rnd(301);
for (int i = 0; i < 2; i++) {
for (int j = 0; j < 100; j++) {
ASSERT_OK(Put(Key(j), rnd.RandomString(50)));
}
ASSERT_OK(Flush());
}
ASSERT_OK(dbfull()->TEST_WaitForCompact());
std::vector<LiveFileMetaData> files_meta;
dbfull()->GetLiveFilesMetaData(&files_meta);
ASSERT_EQ(files_meta.size(), 1);
}
TEST_F(DBTest2, PausingManualCompaction3) {
CompactRangeOptions compact_options;
Options options = CurrentOptions();
options.disable_auto_compactions = true;
options.num_levels = 7;
Random rnd(301);
auto generate_files = [&]() {
for (int i = 0; i < options.num_levels; i++) {
for (int j = 0; j < options.num_levels - i + 1; j++) {
for (int k = 0; k < 1000; k++) {
ASSERT_OK(Put(Key(k + j * 1000), rnd.RandomString(50)));
}
ASSERT_OK(Flush());
}
for (int l = 1; l < options.num_levels - i; l++) {
MoveFilesToLevel(l);
}
}
};
DestroyAndReopen(options);
generate_files();
ASSERT_EQ("2,3,4,5,6,7,8", FilesPerLevel());
int run_manual_compactions = 0;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"CompactionJob::Run():PausingManualCompaction:1",
[&](void* ) { run_manual_compactions++; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
dbfull()->DisableManualCompaction();
ASSERT_TRUE(dbfull()
->CompactRange(compact_options, nullptr, nullptr)
.IsManualCompactionPaused());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ(run_manual_compactions, 0);
ASSERT_EQ("2,3,4,5,6,7,8", FilesPerLevel());
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack(
"CompactionJob::Run():PausingManualCompaction:1");
dbfull()->EnableManualCompaction();
ASSERT_OK(dbfull()->CompactRange(compact_options, nullptr, nullptr));
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ("0,0,0,0,0,0,2", FilesPerLevel());
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(DBTest2, PausingManualCompaction4) {
CompactRangeOptions compact_options;
Options options = CurrentOptions();
options.disable_auto_compactions = true;
options.num_levels = 7;
Random rnd(301);
auto generate_files = [&]() {
for (int i = 0; i < options.num_levels; i++) {
for (int j = 0; j < options.num_levels - i + 1; j++) {
for (int k = 0; k < 1000; k++) {
ASSERT_OK(Put(Key(k + j * 1000), rnd.RandomString(50)));
}
ASSERT_OK(Flush());
}
for (int l = 1; l < options.num_levels - i; l++) {
MoveFilesToLevel(l);
}
}
};
DestroyAndReopen(options);
generate_files();
ASSERT_EQ("2,3,4,5,6,7,8", FilesPerLevel());
int run_manual_compactions = 0;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"CompactionJob::Run():PausingManualCompaction:2", [&](void* arg) {
auto canceled = static_cast<std::atomic<bool>*>(arg);
if (canceled != nullptr) {
canceled->store(true, std::memory_order_release);
}
run_manual_compactions++;
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"TestCompactFiles:PausingManualCompaction:3", [&](void* arg) {
auto paused = static_cast<std::atomic<int>*>(arg);
ASSERT_EQ(0, paused->load(std::memory_order_acquire));
paused->fetch_add(1, std::memory_order_release);
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
ASSERT_TRUE(dbfull()
->CompactRange(compact_options, nullptr, nullptr)
.IsManualCompactionPaused());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ(run_manual_compactions, 1);
ASSERT_EQ("2,3,4,5,6,7,8", FilesPerLevel());
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack(
"CompactionJob::Run():PausingManualCompaction:2");
ASSERT_OK(dbfull()->CompactRange(compact_options, nullptr, nullptr));
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ("0,0,0,0,0,0,2", FilesPerLevel());
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(DBTest2, CancelManualCompaction1) {
CompactRangeOptions compact_options;
auto canceledPtr =
std::unique_ptr<std::atomic<bool>>(new std::atomic<bool>{true});
compact_options.canceled = canceledPtr.get();
Options options = CurrentOptions();
options.disable_auto_compactions = true;
options.num_levels = 7;
Random rnd(301);
auto generate_files = [&]() {
for (int i = 0; i < options.num_levels; i++) {
for (int j = 0; j < options.num_levels - i + 1; j++) {
for (int k = 0; k < 1000; k++) {
ASSERT_OK(Put(Key(k + j * 1000), rnd.RandomString(50)));
}
ASSERT_OK(Flush());
}
for (int l = 1; l < options.num_levels - i; l++) {
MoveFilesToLevel(l);
}
}
};
DestroyAndReopen(options);
generate_files();
ASSERT_EQ("2,3,4,5,6,7,8", FilesPerLevel());
int run_manual_compactions = 0;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"CompactionJob::Run():PausingManualCompaction:1",
[&](void* ) { run_manual_compactions++; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
int compactions_run = 0;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::RunManualCompaction()::1",
[&](void* ) { ++compactions_run; });
ASSERT_TRUE(dbfull()
->CompactRange(compact_options, nullptr, nullptr)
.IsManualCompactionPaused());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ(compactions_run, 0);
ASSERT_EQ(run_manual_compactions, 0);
ASSERT_EQ("2,3,4,5,6,7,8", FilesPerLevel());
compactions_run = 0;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack(
"DBImpl::RunManualCompaction()::1");
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::RunManualCompaction()::1", [&](void* ) {
++compactions_run;
if (compactions_run == 3) {
compact_options.canceled->store(true, std::memory_order_release);
}
});
compact_options.canceled->store(false, std::memory_order_release);
ASSERT_TRUE(dbfull()
->CompactRange(compact_options, nullptr, nullptr)
.IsManualCompactionPaused());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ(compactions_run, 3);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack(
"DBImpl::RunManualCompaction()::1");
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack(
"CompactionJob::Run():PausingManualCompaction:1");
compact_options.canceled->store(false, std::memory_order_relaxed);
ASSERT_OK(dbfull()->CompactRange(compact_options, nullptr, nullptr));
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ("0,0,0,0,0,0,2", FilesPerLevel());
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(DBTest2, CancelManualCompaction2) {
CompactRangeOptions compact_options;
auto canceledPtr =
std::unique_ptr<std::atomic<bool>>(new std::atomic<bool>{true});
compact_options.canceled = canceledPtr.get();
compact_options.max_subcompactions = 1;
Options options = CurrentOptions();
options.disable_auto_compactions = true;
options.num_levels = 7;
Random rnd(301);
auto generate_files = [&]() {
for (int i = 0; i < options.num_levels; i++) {
for (int j = 0; j < options.num_levels - i + 1; j++) {
for (int k = 0; k < 1000; k++) {
ASSERT_OK(Put(Key(k + j * 1000), rnd.RandomString(50)));
}
ASSERT_OK(Flush());
}
for (int l = 1; l < options.num_levels - i; l++) {
MoveFilesToLevel(l);
}
}
};
DestroyAndReopen(options);
generate_files();
ASSERT_EQ("2,3,4,5,6,7,8", FilesPerLevel());
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
int compactions_run = 0;
std::atomic<int> kv_compactions{0};
int compactions_stopped_at = 0;
int kv_compactions_stopped_at = 0;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::RunManualCompaction()::1", [&](void* ) {
++compactions_run;
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"CompactionIterator:ProcessKV", [&](void* ) {
int kv_compactions_run =
kv_compactions.fetch_add(1, std::memory_order_release);
if (kv_compactions_run == 5) {
compact_options.canceled->store(true, std::memory_order_release);
kv_compactions_stopped_at = kv_compactions_run;
compactions_stopped_at = compactions_run;
}
});
compact_options.canceled->store(false, std::memory_order_release);
ASSERT_TRUE(dbfull()
->CompactRange(compact_options, nullptr, nullptr)
.IsManualCompactionPaused());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ(kv_compactions, kv_compactions_stopped_at + 1);
ASSERT_EQ(compactions_run, compactions_stopped_at);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack(
"CompactionIterator::ProcessKV");
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack(
"DBImpl::RunManualCompaction()::1");
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack(
"CompactionJob::Run():PausingManualCompaction:1");
compact_options.canceled->store(false, std::memory_order_relaxed);
ASSERT_OK(dbfull()->CompactRange(compact_options, nullptr, nullptr));
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ("0,0,0,0,0,0,2", FilesPerLevel());
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
class CancelCompactionListener : public EventListener {
public:
CancelCompactionListener()
: num_compaction_started_(0), num_compaction_ended_(0) {}
void OnCompactionBegin(DB* , const CompactionJobInfo& ci) override {
ASSERT_EQ(ci.cf_name, "default");
ASSERT_EQ(ci.base_input_level, 0);
num_compaction_started_++;
}
void OnCompactionCompleted(DB* , const CompactionJobInfo& ci) override {
ASSERT_EQ(ci.cf_name, "default");
ASSERT_EQ(ci.base_input_level, 0);
ASSERT_EQ(ci.status.code(), code_);
ASSERT_EQ(ci.status.subcode(), subcode_);
num_compaction_ended_++;
}
std::atomic<size_t> num_compaction_started_;
std::atomic<size_t> num_compaction_ended_;
Status::Code code_;
Status::SubCode subcode_;
};
TEST_F(DBTest2, CancelManualCompactionWithListener) {
CompactRangeOptions compact_options;
auto canceledPtr =
std::unique_ptr<std::atomic<bool>>(new std::atomic<bool>{true});
compact_options.canceled = canceledPtr.get();
compact_options.max_subcompactions = 1;
Options options = CurrentOptions();
options.disable_auto_compactions = true;
CancelCompactionListener* listener = new CancelCompactionListener();
options.listeners.emplace_back(listener);
DestroyAndReopen(options);
Random rnd(301);
for (int i = 0; i < 10; i++) {
for (int j = 0; j < 10; j++) {
ASSERT_OK(Put(Key(i + j * 10), rnd.RandomString(50)));
}
ASSERT_OK(Flush());
}
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"CompactionIterator:ProcessKV", [&](void* ) {
compact_options.canceled->store(true, std::memory_order_release);
});
int running_compaction = 0;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"CompactionJob::FinishCompactionOutputFile1",
[&](void* ) { running_compaction++; });
listener->code_ = Status::kIncomplete;
listener->subcode_ = Status::SubCode::kManualCompactionPaused;
compact_options.canceled->store(false, std::memory_order_release);
ASSERT_TRUE(dbfull()
->CompactRange(compact_options, nullptr, nullptr)
.IsManualCompactionPaused());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_GT(listener->num_compaction_started_, 0);
ASSERT_EQ(listener->num_compaction_started_, listener->num_compaction_ended_);
ASSERT_EQ(running_compaction, 0);
listener->num_compaction_started_ = 0;
listener->num_compaction_ended_ = 0;
ASSERT_TRUE(dbfull()
->CompactRange(compact_options, nullptr, nullptr)
.IsManualCompactionPaused());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ(listener->num_compaction_started_, 0);
ASSERT_EQ(listener->num_compaction_started_, listener->num_compaction_ended_);
ASSERT_EQ(running_compaction, 0);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack(
"CompactionIterator:ProcessKV");
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"CompactionJob::Run:BeforeVerify", [&](void* ) {
compact_options.canceled->store(true, std::memory_order_release);
});
listener->code_ = Status::kOk;
listener->subcode_ = Status::SubCode::kNone;
compact_options.canceled->store(false, std::memory_order_release);
ASSERT_OK(dbfull()->CompactRange(compact_options, nullptr, nullptr));
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_GT(listener->num_compaction_started_, 0);
ASSERT_EQ(listener->num_compaction_started_, listener->num_compaction_ended_);
ASSERT_GT(running_compaction, 0);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(DBTest2, CompactionOnBottomPriorityWithListener) {
int num_levels = 3;
const int kNumFilesTrigger = 4;
Options options = CurrentOptions();
env_->SetBackgroundThreads(0, Env::Priority::HIGH);
env_->SetBackgroundThreads(0, Env::Priority::LOW);
env_->SetBackgroundThreads(1, Env::Priority::BOTTOM);
options.env = env_;
options.compaction_style = kCompactionStyleUniversal;
options.num_levels = num_levels;
options.write_buffer_size = 100 << 10; options.target_file_size_base = 32 << 10; options.level0_file_num_compaction_trigger = kNumFilesTrigger;
options.compaction_options_universal.max_size_amplification_percent = 110;
CancelCompactionListener* listener = new CancelCompactionListener();
options.listeners.emplace_back(listener);
DestroyAndReopen(options);
int num_bottom_thread_compaction_scheduled = 0;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundCompaction:ForwardToBottomPriPool",
[&](void* ) { num_bottom_thread_compaction_scheduled++; });
int num_compaction_jobs = 0;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"CompactionJob::Run():End",
[&](void* ) { num_compaction_jobs++; });
listener->code_ = Status::kOk;
listener->subcode_ = Status::SubCode::kNone;
Random rnd(301);
for (int i = 0; i < 1; ++i) {
for (int num = 0; num < kNumFilesTrigger; num++) {
int key_idx = 0;
GenerateNewFile(&rnd, &key_idx, true );
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
}
}
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_GT(num_bottom_thread_compaction_scheduled, 0);
ASSERT_EQ(num_compaction_jobs, 1);
ASSERT_GT(listener->num_compaction_started_, 0);
ASSERT_EQ(listener->num_compaction_started_, listener->num_compaction_ended_);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(DBTest2, OptimizeForPointLookup) {
Options options = CurrentOptions();
Close();
options.OptimizeForPointLookup(2);
ASSERT_OK(DB::Open(options, dbname_, &db_));
ASSERT_OK(Put("foo", "v1"));
ASSERT_EQ("v1", Get("foo"));
ASSERT_OK(Flush());
ASSERT_EQ("v1", Get("foo"));
}
TEST_F(DBTest2, OptimizeForSmallDB) {
Options options = CurrentOptions();
Close();
options.OptimizeForSmallDb();
ASSERT_TRUE(options.table_factory->IsInstanceOf(
TableFactory::kBlockBasedTableName()));
auto table_options =
options.table_factory->GetOptions<BlockBasedTableOptions>();
ASSERT_TRUE(table_options != nullptr);
std::shared_ptr<Cache> cache = table_options->block_cache;
ASSERT_EQ(0, cache->GetUsage());
ASSERT_OK(DB::Open(options, dbname_, &db_));
ASSERT_OK(Put("foo", "v1"));
ASSERT_NE(0, cache->GetUsage());
ASSERT_EQ("v1", Get("foo"));
ASSERT_OK(Flush());
size_t prev_size = cache->GetUsage();
PinnableSlice value;
ASSERT_OK(db_->Get(ReadOptions(), db_->DefaultColumnFamily(), "foo", &value));
ASSERT_GT(cache->GetUsage(), prev_size);
value.Reset();
}
TEST_F(DBTest2, IterRaceFlush1) {
ASSERT_OK(Put("foo", "v1"));
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::NewIterator:1", "DBTest2::IterRaceFlush:1"},
{"DBTest2::IterRaceFlush:2", "DBImpl::NewIterator:2"}});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
ROCKSDB_NAMESPACE::port::Thread t1([&] {
TEST_SYNC_POINT("DBTest2::IterRaceFlush:1");
ASSERT_OK(Put("foo", "v2"));
ASSERT_OK(Flush());
TEST_SYNC_POINT("DBTest2::IterRaceFlush:2");
});
{
std::unique_ptr<Iterator> it(db_->NewIterator(ReadOptions()));
it->Seek("foo");
ASSERT_TRUE(it->Valid());
ASSERT_OK(it->status());
ASSERT_EQ("foo", it->key().ToString());
ASSERT_EQ("v2", it->value().ToString());
}
t1.join();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(DBTest2, IterRaceFlush2) {
ASSERT_OK(Put("foo", "v1"));
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::NewIterator:3", "DBTest2::IterRaceFlush2:1"},
{"DBTest2::IterRaceFlush2:2", "DBImpl::NewIterator:4"}});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
ROCKSDB_NAMESPACE::port::Thread t1([&] {
TEST_SYNC_POINT("DBTest2::IterRaceFlush2:1");
ASSERT_OK(Put("foo", "v2"));
ASSERT_OK(Flush());
TEST_SYNC_POINT("DBTest2::IterRaceFlush2:2");
});
{
std::unique_ptr<Iterator> it(db_->NewIterator(ReadOptions()));
it->Seek("foo");
ASSERT_TRUE(it->Valid());
ASSERT_OK(it->status());
ASSERT_EQ("foo", it->key().ToString());
ASSERT_EQ("v1", it->value().ToString());
}
t1.join();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(DBTest2, IterRefreshRaceFlush) {
ASSERT_OK(Put("foo", "v1"));
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
{{"ArenaWrappedDBIter::Refresh:1", "DBTest2::IterRefreshRaceFlush:1"},
{"DBTest2::IterRefreshRaceFlush:2", "ArenaWrappedDBIter::Refresh:2"}});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
ROCKSDB_NAMESPACE::port::Thread t1([&] {
TEST_SYNC_POINT("DBTest2::IterRefreshRaceFlush:1");
ASSERT_OK(Put("foo", "v2"));
ASSERT_OK(Flush());
TEST_SYNC_POINT("DBTest2::IterRefreshRaceFlush:2");
});
{
std::unique_ptr<Iterator> it(db_->NewIterator(ReadOptions()));
ASSERT_OK(it->status());
ASSERT_OK(it->Refresh());
it->Seek("foo");
ASSERT_TRUE(it->Valid());
ASSERT_OK(it->status());
ASSERT_EQ("foo", it->key().ToString());
ASSERT_EQ("v2", it->value().ToString());
}
t1.join();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(DBTest2, GetRaceFlush1) {
ASSERT_OK(Put("foo", "v1"));
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::GetImpl:1", "DBTest2::GetRaceFlush:1"},
{"DBTest2::GetRaceFlush:2", "DBImpl::GetImpl:2"}});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
ROCKSDB_NAMESPACE::port::Thread t1([&] {
TEST_SYNC_POINT("DBTest2::GetRaceFlush:1");
ASSERT_OK(Put("foo", "v2"));
ASSERT_OK(Flush());
TEST_SYNC_POINT("DBTest2::GetRaceFlush:2");
});
ASSERT_NE("NOT_FOUND", Get("foo"));
t1.join();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(DBTest2, GetRaceFlush2) {
ASSERT_OK(Put("foo", "v1"));
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::GetImpl:3", "DBTest2::GetRaceFlush:1"},
{"DBTest2::GetRaceFlush:2", "DBImpl::GetImpl:4"}});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
port::Thread t1([&] {
TEST_SYNC_POINT("DBTest2::GetRaceFlush:1");
ASSERT_OK(Put("foo", "v2"));
ASSERT_OK(Flush());
TEST_SYNC_POINT("DBTest2::GetRaceFlush:2");
});
ASSERT_NE("NOT_FOUND", Get("foo"));
t1.join();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(DBTest2, DirectIO) {
if (!IsDirectIOSupported()) {
return;
}
Options options = CurrentOptions();
options.use_direct_reads = options.use_direct_io_for_flush_and_compaction =
true;
options.allow_mmap_reads = options.allow_mmap_writes = false;
DestroyAndReopen(options);
ASSERT_OK(Put(Key(0), "a"));
ASSERT_OK(Put(Key(5), "a"));
ASSERT_OK(Flush());
ASSERT_OK(Put(Key(10), "a"));
ASSERT_OK(Put(Key(15), "a"));
ASSERT_OK(Flush());
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
Reopen(options);
}
TEST_F(DBTest2, MemtableOnlyIterator) {
Options options = CurrentOptions();
CreateAndReopenWithCF({"pikachu"}, options);
ASSERT_OK(Put(1, "foo", "first"));
ASSERT_OK(Put(1, "bar", "second"));
ReadOptions ropt;
ropt.read_tier = kMemtableTier;
std::string value;
Iterator* it = nullptr;
ASSERT_OK(db_->Get(ropt, handles_[1], "foo", &value));
ASSERT_EQ("first", value);
ASSERT_OK(db_->Get(ropt, handles_[1], "bar", &value));
ASSERT_EQ("second", value);
it = db_->NewIterator(ropt, handles_[1]);
int count = 0;
for (it->SeekToFirst(); it->Valid(); it->Next()) {
ASSERT_TRUE(it->Valid());
count++;
}
ASSERT_TRUE(!it->Valid());
ASSERT_OK(it->status());
ASSERT_EQ(2, count);
delete it;
ASSERT_OK(Flush(1));
ASSERT_OK(db_->Get(ropt, handles_[1], "foo", &value));
ASSERT_EQ("first", value);
ASSERT_OK(db_->Get(ropt, handles_[1], "bar", &value));
ASSERT_EQ("second", value);
it = db_->NewIterator(ropt, handles_[1]);
ASSERT_OK(it->status());
count = 0;
for (it->SeekToFirst(); it->Valid(); it->Next()) {
ASSERT_TRUE(it->Valid());
count++;
}
ASSERT_TRUE(!it->Valid());
ASSERT_EQ(0, count);
ASSERT_OK(it->status());
delete it;
ASSERT_OK(Put(1, "foobar", "third"));
it = db_->NewIterator(ropt, handles_[1]);
ASSERT_OK(it->status());
count = 0;
for (it->SeekToFirst(); it->Valid(); it->Next()) {
ASSERT_TRUE(it->Valid());
ASSERT_EQ("foobar", it->key().ToString());
ASSERT_EQ("third", it->value().ToString());
count++;
}
ASSERT_TRUE(!it->Valid());
ASSERT_EQ(1, count);
ASSERT_OK(it->status());
delete it;
}
TEST_F(DBTest2, LowPriWrite) {
Options options = CurrentOptions();
options.level0_file_num_compaction_trigger = 4;
options.level0_slowdown_writes_trigger = 12;
options.level0_stop_writes_trigger = 30;
options.delayed_write_rate = 8 * 1024 * 1024;
Reopen(options);
std::atomic<int> rate_limit_count(0);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"GenericRateLimiter::Request:1", [&](void* arg) {
rate_limit_count.fetch_add(1);
int64_t* rate_bytes_per_sec = static_cast<int64_t*>(arg);
ASSERT_EQ(1024 * 1024, *rate_bytes_per_sec);
});
Random rnd(301);
ASSERT_OK(Put("", rnd.RandomString(102400)));
ASSERT_OK(Flush());
MoveFilesToLevel(6);
ASSERT_OK(Put("", ""));
ASSERT_OK(Flush());
MoveFilesToLevel(5);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
{"DBTest.LowPriWrite:0", "DBImpl::BGWorkCompaction"},
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
WriteOptions wo;
for (int i = 0; i < 6; i++) {
wo.low_pri = false;
ASSERT_OK(Put("", "", wo));
wo.low_pri = true;
ASSERT_OK(Put("", "", wo));
ASSERT_OK(Flush());
}
ASSERT_EQ(0, rate_limit_count.load());
wo.low_pri = true;
ASSERT_OK(Put("", "", wo));
ASSERT_EQ(1, rate_limit_count.load());
wo.low_pri = false;
ASSERT_OK(Put("", "", wo));
ASSERT_EQ(1, rate_limit_count.load());
wo.low_pri = true;
std::string big_value = std::string(1 * 1024 * 1024, 'x');
ASSERT_OK(Put("", big_value, wo));
ASSERT_LT(1, rate_limit_count.load());
rate_limit_count = 0;
wo.low_pri = false;
ASSERT_OK(Put("", big_value, wo));
ASSERT_EQ(0, rate_limit_count.load());
TEST_SYNC_POINT("DBTest.LowPriWrite:0");
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ASSERT_OK(dbfull()->TEST_WaitForCompact());
wo.low_pri = true;
ASSERT_OK(Put("", "", wo));
ASSERT_EQ(0, rate_limit_count.load());
wo.low_pri = false;
ASSERT_OK(Put("", "", wo));
ASSERT_EQ(0, rate_limit_count.load());
}
TEST_F(DBTest2, RateLimitedCompactionReads) {
const int kNumKeysPerFile = 128;
const int kBytesPerKey = 1024;
const int kNumL0Files = 4;
for (int compaction_readahead_size : {0, 32 << 10}) {
for (auto use_direct_io : {false, true}) {
if (use_direct_io && !IsDirectIOSupported()) {
continue;
}
Options options = CurrentOptions();
options.compaction_readahead_size = compaction_readahead_size;
options.compression = kNoCompression;
options.level0_file_num_compaction_trigger = kNumL0Files;
options.memtable_factory.reset(
test::NewSpecialSkipListFactory(kNumKeysPerFile));
options.rate_limiter.reset(NewGenericRateLimiter(
static_cast<int64_t>(kNumL0Files * kNumKeysPerFile *
kBytesPerKey) ,
10 * 1000 , 10 ,
RateLimiter::Mode::kReadsOnly));
options.use_direct_reads =
options.use_direct_io_for_flush_and_compaction = use_direct_io;
BlockBasedTableOptions bbto;
bbto.block_size = 16384;
bbto.no_block_cache = true;
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
DestroyAndReopen(options);
std::shared_ptr<test::SleepingBackgroundTask> sleeping_task(
new test::SleepingBackgroundTask());
env_->SetBackgroundThreads(1, Env::LOW);
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
sleeping_task.get(), Env::Priority::LOW);
sleeping_task->WaitUntilSleeping();
for (int i = 0; i < kNumL0Files; ++i) {
for (int j = 0; j <= kNumKeysPerFile; ++j) {
ASSERT_OK(Put(Key(j), DummyString(kBytesPerKey)));
}
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
if (i + 1 < kNumL0Files) {
ASSERT_EQ(i + 1, NumTableFilesAtLevel(0));
}
}
size_t rate_limited_bytes_start_bytes =
options.rate_limiter->GetTotalBytesThrough(Env::IO_TOTAL);
sleeping_task->WakeUp();
sleeping_task->WaitUntilDone();
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ(0, NumTableFilesAtLevel(0));
size_t rate_limited_bytes =
static_cast<size_t>(
options.rate_limiter->GetTotalBytesThrough(Env::IO_TOTAL)) -
rate_limited_bytes_start_bytes;
size_t rate_limited_bytes_by_pri =
options.rate_limiter->GetTotalBytesThrough(Env::IO_LOW) +
options.rate_limiter->GetTotalBytesThrough(Env::IO_USER);
ASSERT_EQ(rate_limited_bytes,
static_cast<size_t>(rate_limited_bytes_by_pri));
size_t direct_io_extra = use_direct_io ? 512 * 1024 : 0;
ASSERT_GE(
rate_limited_bytes,
static_cast<size_t>(kNumKeysPerFile * kBytesPerKey * kNumL0Files));
ASSERT_LT(
rate_limited_bytes,
static_cast<size_t>(2 * kNumKeysPerFile * kBytesPerKey * kNumL0Files +
direct_io_extra));
Iterator* iter = db_->NewIterator(ReadOptions());
ASSERT_OK(iter->status());
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ASSERT_EQ(iter->value().ToString(), DummyString(kBytesPerKey));
}
delete iter;
rate_limited_bytes_by_pri =
options.rate_limiter->GetTotalBytesThrough(Env::IO_LOW) +
options.rate_limiter->GetTotalBytesThrough(Env::IO_USER);
ASSERT_EQ(rate_limited_bytes,
static_cast<size_t>(rate_limited_bytes_by_pri));
}
}
}
TEST_F(DBTest2, ReduceLevel) {
Options options;
options.env = env_;
options.disable_auto_compactions = true;
options.num_levels = 7;
Reopen(options);
ASSERT_OK(Put("foo", "bar"));
ASSERT_OK(Flush());
MoveFilesToLevel(6);
ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel());
CompactRangeOptions compact_options;
compact_options.change_level = true;
compact_options.target_level = 1;
ASSERT_OK(dbfull()->CompactRange(compact_options, nullptr, nullptr));
ASSERT_EQ("0,1", FilesPerLevel());
options.num_levels = 3;
Reopen(options);
ASSERT_EQ("0,1", FilesPerLevel());
}
TEST_F(DBTest2, ReadCallbackTest) {
Options options;
options.disable_auto_compactions = true;
options.num_levels = 7;
options.env = env_;
Reopen(options);
std::vector<const Snapshot*> snapshots;
const std::string key = "foo";
const std::string value = "bar";
int i = 1;
for (; i < 10; i++) {
ASSERT_OK(Put(key, value + std::to_string(i)));
auto snapshot = dbfull()->GetSnapshot();
snapshots.push_back(snapshot);
}
ASSERT_OK(Flush());
for (; i < 20; i++) {
ASSERT_OK(Put(key, value + std::to_string(i)));
auto snapshot = dbfull()->GetSnapshot();
snapshots.push_back(snapshot);
}
ASSERT_OK(Flush());
MoveFilesToLevel(6);
ASSERT_EQ("0,0,0,0,0,0,2", FilesPerLevel());
for (; i < 30; i++) {
ASSERT_OK(Put(key, value + std::to_string(i)));
auto snapshot = dbfull()->GetSnapshot();
snapshots.push_back(snapshot);
}
ASSERT_OK(Flush());
ASSERT_EQ("1,0,0,0,0,0,2", FilesPerLevel());
for (; i < 40; i++) {
ASSERT_OK(Put(key, value + std::to_string(i)));
auto snapshot = dbfull()->GetSnapshot();
snapshots.push_back(snapshot);
}
class TestReadCallback : public ReadCallback {
public:
explicit TestReadCallback(SequenceNumber snapshot)
: ReadCallback(snapshot), snapshot_(snapshot) {}
bool IsVisibleFullCheck(SequenceNumber seq) override {
return seq <= snapshot_;
}
private:
SequenceNumber snapshot_;
};
for (int seq = 1; seq < i; seq++) {
PinnableSlice pinnable_val;
ReadOptions roptions;
TestReadCallback callback(seq);
bool dont_care = true;
DBImpl::GetImplOptions get_impl_options;
get_impl_options.column_family = dbfull()->DefaultColumnFamily();
get_impl_options.value = &pinnable_val;
get_impl_options.value_found = &dont_care;
get_impl_options.callback = &callback;
Status s = dbfull()->GetImpl(roptions, key, get_impl_options);
ASSERT_TRUE(s.ok());
ASSERT_EQ(value + std::to_string(seq), pinnable_val.ToString());
}
for (auto snapshot : snapshots) {
dbfull()->ReleaseSnapshot(snapshot);
}
}
TEST_F(DBTest2, LiveFilesOmitObsoleteFiles) {
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
{"DBImpl::BackgroundCallFlush:FilesFound",
"DBTest2::LiveFilesOmitObsoleteFiles:FlushTriggered"},
{"DBImpl::PurgeObsoleteFiles:End",
"DBTest2::LiveFilesOmitObsoleteFiles:LiveFilesCaptured"},
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::PurgeObsoleteFiles:Begin",
[&](void* ) { env_->SleepForMicroseconds(1000000); });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(Put("key", "val"));
FlushOptions flush_opts;
flush_opts.wait = false;
ASSERT_OK(db_->Flush(flush_opts));
TEST_SYNC_POINT("DBTest2::LiveFilesOmitObsoleteFiles:FlushTriggered");
ASSERT_OK(db_->DisableFileDeletions());
VectorWalPtr log_files;
ASSERT_OK(db_->GetSortedWalFiles(log_files));
TEST_SYNC_POINT("DBTest2::LiveFilesOmitObsoleteFiles:LiveFilesCaptured");
for (const auto& log_file : log_files) {
ASSERT_OK(env_->FileExists(LogFileName(dbname_, log_file->LogNumber())));
}
ASSERT_OK(db_->EnableFileDeletions());
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(DBTest2, TestNumPread) {
Options options = CurrentOptions();
bool prefetch_supported =
test::IsPrefetchSupported(env_->GetFileSystem(), dbname_);
BlockBasedTableOptions table_options;
table_options.no_block_cache = true;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
Reopen(options);
env_->count_random_reads_ = true;
env_->random_file_open_counter_.store(0);
ASSERT_OK(Put("bar", "foo"));
ASSERT_OK(Put("foo", "bar"));
ASSERT_OK(Flush());
if (prefetch_supported) {
ASSERT_EQ(4, env_->random_read_counter_.Read());
} else {
ASSERT_EQ(1, env_->random_read_counter_.Read());
}
ASSERT_EQ(1, env_->random_file_open_counter_.load());
env_->random_file_open_counter_.store(0);
env_->random_read_counter_.Reset();
ASSERT_EQ("bar", Get("foo"));
ASSERT_EQ(1, env_->random_read_counter_.Read());
ASSERT_EQ(0, env_->random_file_open_counter_.load());
env_->random_file_open_counter_.store(0);
env_->random_read_counter_.Reset();
ASSERT_OK(Put("bar2", "foo2"));
ASSERT_OK(Put("foo2", "bar2"));
ASSERT_OK(Flush());
if (prefetch_supported) {
ASSERT_EQ(4, env_->random_read_counter_.Read());
} else {
ASSERT_EQ(1, env_->random_read_counter_.Read());
}
ASSERT_EQ(1, env_->random_file_open_counter_.load());
env_->random_file_open_counter_.store(0);
env_->random_read_counter_.Reset();
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
if (prefetch_supported) {
ASSERT_EQ(6, env_->random_read_counter_.Read());
} else {
ASSERT_EQ(3, env_->random_read_counter_.Read());
}
ASSERT_EQ(1, env_->random_file_open_counter_.load());
env_->random_file_open_counter_.store(0);
env_->random_read_counter_.Reset();
ASSERT_EQ("foo2", Get("bar2"));
ASSERT_EQ(1, env_->random_read_counter_.Read());
ASSERT_EQ(0, env_->random_file_open_counter_.load());
}
class TraceExecutionResultHandler : public TraceRecordResult::Handler {
public:
TraceExecutionResultHandler() = default;
~TraceExecutionResultHandler() override = default;
Status Handle(const StatusOnlyTraceExecutionResult& result) override {
if (result.GetStartTimestamp() > result.GetEndTimestamp()) {
return Status::InvalidArgument("Invalid timestamps.");
}
result.GetStatus().PermitUncheckedError();
switch (result.GetTraceType()) {
case kTraceWrite: {
total_latency_ += result.GetLatency();
cnt_++;
writes_++;
break;
}
default:
return Status::Corruption("Type mismatch.");
}
return Status::OK();
}
Status Handle(const SingleValueTraceExecutionResult& result) override {
if (result.GetStartTimestamp() > result.GetEndTimestamp()) {
return Status::InvalidArgument("Invalid timestamps.");
}
result.GetStatus().PermitUncheckedError();
switch (result.GetTraceType()) {
case kTraceGet: {
total_latency_ += result.GetLatency();
cnt_++;
gets_++;
break;
}
default:
return Status::Corruption("Type mismatch.");
}
return Status::OK();
}
Status Handle(const MultiValuesTraceExecutionResult& result) override {
if (result.GetStartTimestamp() > result.GetEndTimestamp()) {
return Status::InvalidArgument("Invalid timestamps.");
}
for (const Status& s : result.GetMultiStatus()) {
s.PermitUncheckedError();
}
switch (result.GetTraceType()) {
case kTraceMultiGet: {
total_latency_ += result.GetLatency();
cnt_++;
multigets_++;
break;
}
default:
return Status::Corruption("Type mismatch.");
}
return Status::OK();
}
Status Handle(const IteratorTraceExecutionResult& result) override {
if (result.GetStartTimestamp() > result.GetEndTimestamp()) {
return Status::InvalidArgument("Invalid timestamps.");
}
result.GetStatus().PermitUncheckedError();
switch (result.GetTraceType()) {
case kTraceIteratorSeek:
case kTraceIteratorSeekForPrev: {
total_latency_ += result.GetLatency();
cnt_++;
seeks_++;
break;
}
default:
return Status::Corruption("Type mismatch.");
}
return Status::OK();
}
void Reset() {
total_latency_ = 0;
cnt_ = 0;
writes_ = 0;
gets_ = 0;
seeks_ = 0;
multigets_ = 0;
}
double GetAvgLatency() const {
return cnt_ == 0 ? 0.0 : 1.0 * total_latency_ / cnt_;
}
int GetNumWrites() const { return writes_; }
int GetNumGets() const { return gets_; }
int GetNumIterSeeks() const { return seeks_; }
int GetNumMultiGets() const { return multigets_; }
private:
std::atomic<uint64_t> total_latency_{0};
std::atomic<uint32_t> cnt_{0};
std::atomic<int> writes_{0};
std::atomic<int> gets_{0};
std::atomic<int> seeks_{0};
std::atomic<int> multigets_{0};
};
TEST_F(DBTest2, TraceAndReplay) {
Options options = CurrentOptions();
options.merge_operator = MergeOperators::CreatePutOperator();
ReadOptions ro;
WriteOptions wo;
TraceOptions trace_opts;
EnvOptions env_opts;
CreateAndReopenWithCF({"pikachu"}, options);
Random rnd(301);
Iterator* single_iter = nullptr;
ASSERT_TRUE(db_->EndTrace().IsIOError());
std::string trace_filename = dbname_ + "/rocksdb.trace";
std::unique_ptr<TraceWriter> trace_writer;
ASSERT_OK(NewFileTraceWriter(env_, env_opts, trace_filename, &trace_writer));
ASSERT_OK(db_->StartTrace(trace_opts, std::move(trace_writer)));
ASSERT_OK(Put(0, "a", "1"));
ASSERT_OK(Merge(0, "b", "2"));
ASSERT_OK(Delete(0, "c"));
ASSERT_OK(SingleDelete(0, "d"));
ASSERT_OK(db_->DeleteRange(wo, dbfull()->DefaultColumnFamily(), "e", "f"));
WriteBatch batch;
ASSERT_OK(batch.Put("f", "11"));
ASSERT_OK(batch.Merge("g", "12"));
ASSERT_OK(batch.Delete("h"));
ASSERT_OK(batch.SingleDelete("i"));
ASSERT_OK(batch.DeleteRange("j", "k"));
ASSERT_OK(db_->Write(wo, &batch));
single_iter = db_->NewIterator(ro);
single_iter->Seek("f"); single_iter->SeekForPrev("g");
ASSERT_OK(single_iter->status());
delete single_iter;
ASSERT_EQ("1", Get(0, "a"));
ASSERT_EQ("12", Get(0, "g"));
ASSERT_OK(Put(1, "foo", "bar"));
ASSERT_OK(Put(1, "rocksdb", "rocks"));
ASSERT_EQ("NOT_FOUND", Get(1, "leveldb"));
ASSERT_OK(db_->EndTrace());
ASSERT_OK(Put("hello", "world"));
ASSERT_OK(Merge("foo", "bar"));
std::string value;
std::string dbname2 = test::PerThreadDBPath(env_, "/db_replay");
ASSERT_OK(DestroyDB(dbname2, options));
std::unique_ptr<DB> db2_init;
options.create_if_missing = true;
ASSERT_OK(DB::Open(options, dbname2, &db2_init));
ColumnFamilyHandle* cf;
ASSERT_OK(
db2_init->CreateColumnFamily(ColumnFamilyOptions(), "pikachu", &cf));
delete cf;
db2_init.reset();
std::unique_ptr<DB> db2;
std::vector<ColumnFamilyDescriptor> column_families;
ColumnFamilyOptions cf_options;
cf_options.merge_operator = MergeOperators::CreatePutOperator();
column_families.emplace_back("default", cf_options);
column_families.emplace_back("pikachu", ColumnFamilyOptions());
std::vector<ColumnFamilyHandle*> handles;
DBOptions db_opts;
db_opts.env = env_;
ASSERT_OK(DB::Open(db_opts, dbname2, column_families, &handles, &db2));
env_->SleepForMicroseconds(100);
ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound());
ASSERT_TRUE(db2->Get(ro, handles[0], "g", &value).IsNotFound());
std::unique_ptr<TraceReader> trace_reader;
ASSERT_OK(NewFileTraceReader(env_, env_opts, trace_filename, &trace_reader));
std::unique_ptr<Replayer> replayer;
ASSERT_OK(
db2->NewDefaultReplayer(handles, std::move(trace_reader), &replayer));
TraceExecutionResultHandler res_handler;
std::function<void(Status, std::unique_ptr<TraceRecordResult>&&)> res_cb =
[&res_handler](Status exec_s, std::unique_ptr<TraceRecordResult>&& res) {
ASSERT_TRUE(exec_s.ok() || exec_s.IsNotSupported());
if (res != nullptr) {
ASSERT_OK(res->Accept(&res_handler));
res.reset();
}
};
ASSERT_TRUE(replayer->Replay(ReplayOptions(), nullptr).IsIncomplete());
ASSERT_OK(replayer->Prepare());
ASSERT_OK(replayer->Prepare());
ASSERT_OK(replayer->Replay(ReplayOptions(1, 1.0), res_cb));
ASSERT_GE(res_handler.GetAvgLatency(), 0.0);
ASSERT_EQ(res_handler.GetNumWrites(), 8);
ASSERT_EQ(res_handler.GetNumGets(), 3);
ASSERT_EQ(res_handler.GetNumIterSeeks(), 2);
ASSERT_EQ(res_handler.GetNumMultiGets(), 0);
res_handler.Reset();
ASSERT_OK(db2->Get(ro, handles[0], "a", &value));
ASSERT_EQ("1", value);
ASSERT_OK(db2->Get(ro, handles[0], "g", &value));
ASSERT_EQ("12", value);
ASSERT_TRUE(db2->Get(ro, handles[0], "hello", &value).IsNotFound());
ASSERT_TRUE(db2->Get(ro, handles[0], "world", &value).IsNotFound());
ASSERT_OK(db2->Get(ro, handles[1], "foo", &value));
ASSERT_EQ("bar", value);
ASSERT_OK(db2->Get(ro, handles[1], "rocksdb", &value));
ASSERT_EQ("rocks", value);
ASSERT_TRUE(replayer->Replay(ReplayOptions(), nullptr).IsIncomplete());
ASSERT_OK(replayer->Prepare());
ASSERT_OK(replayer->Replay(ReplayOptions(2, 2.0), res_cb));
ASSERT_GE(res_handler.GetAvgLatency(), 0.0);
ASSERT_EQ(res_handler.GetNumWrites(), 8);
ASSERT_EQ(res_handler.GetNumGets(), 3);
ASSERT_EQ(res_handler.GetNumIterSeeks(), 2);
ASSERT_EQ(res_handler.GetNumMultiGets(), 0);
res_handler.Reset();
ASSERT_OK(replayer->Prepare());
ASSERT_OK(replayer->Replay(ReplayOptions(2, 0.5), res_cb));
ASSERT_GE(res_handler.GetAvgLatency(), 0.0);
ASSERT_EQ(res_handler.GetNumWrites(), 8);
ASSERT_EQ(res_handler.GetNumGets(), 3);
ASSERT_EQ(res_handler.GetNumIterSeeks(), 2);
ASSERT_EQ(res_handler.GetNumMultiGets(), 0);
res_handler.Reset();
replayer.reset();
for (auto handle : handles) {
delete handle;
}
db2.reset();
ASSERT_OK(DestroyDB(dbname2, options));
}
TEST_F(DBTest2, TraceAndManualReplay) {
Options options = CurrentOptions();
options.merge_operator = MergeOperators::CreatePutOperator();
ReadOptions ro;
WriteOptions wo;
TraceOptions trace_opts;
EnvOptions env_opts;
CreateAndReopenWithCF({"pikachu"}, options);
Random rnd(301);
Iterator* single_iter = nullptr;
ASSERT_TRUE(db_->EndTrace().IsIOError());
std::string trace_filename = dbname_ + "/rocksdb.trace";
std::unique_ptr<TraceWriter> trace_writer;
ASSERT_OK(NewFileTraceWriter(env_, env_opts, trace_filename, &trace_writer));
ASSERT_OK(db_->StartTrace(trace_opts, std::move(trace_writer)));
ASSERT_OK(Put(0, "a", "1"));
ASSERT_OK(Merge(0, "b", "2"));
ASSERT_OK(Delete(0, "c"));
ASSERT_OK(SingleDelete(0, "d"));
ASSERT_OK(db_->DeleteRange(wo, dbfull()->DefaultColumnFamily(), "e", "f"));
WriteBatch batch;
ASSERT_OK(batch.Put("f", "11"));
ASSERT_OK(batch.Merge("g", "12"));
ASSERT_OK(batch.Delete("h"));
ASSERT_OK(batch.SingleDelete("i"));
ASSERT_OK(batch.DeleteRange("j", "k"));
ASSERT_OK(db_->Write(wo, &batch));
single_iter = db_->NewIterator(ro);
single_iter->Seek("f");
single_iter->SeekForPrev("g");
ASSERT_OK(single_iter->status());
delete single_iter;
batch.Clear();
ASSERT_OK(batch.Put("iter-0", "iter-0"));
ASSERT_OK(batch.Put("iter-1", "iter-1"));
ASSERT_OK(batch.Put("iter-2", "iter-2"));
ASSERT_OK(batch.Put("iter-3", "iter-3"));
ASSERT_OK(batch.Put("iter-4", "iter-4"));
ASSERT_OK(db_->Write(wo, &batch));
ReadOptions bounded_ro = ro;
Slice lower_bound("iter-1");
Slice upper_bound("iter-3");
bounded_ro.iterate_lower_bound = &lower_bound;
bounded_ro.iterate_upper_bound = &upper_bound;
single_iter = db_->NewIterator(bounded_ro);
single_iter->Seek("iter-0");
ASSERT_EQ(single_iter->key().ToString(), "iter-1");
single_iter->Seek("iter-2");
ASSERT_EQ(single_iter->key().ToString(), "iter-2");
single_iter->Seek("iter-4");
ASSERT_FALSE(single_iter->Valid());
single_iter->SeekForPrev("iter-0");
ASSERT_FALSE(single_iter->Valid());
single_iter->SeekForPrev("iter-2");
ASSERT_EQ(single_iter->key().ToString(), "iter-2");
single_iter->SeekForPrev("iter-4");
ASSERT_EQ(single_iter->key().ToString(), "iter-2");
ASSERT_OK(single_iter->status());
delete single_iter;
ASSERT_EQ("1", Get(0, "a"));
ASSERT_EQ("12", Get(0, "g"));
ASSERT_OK(Put(1, "foo", "bar"));
ASSERT_OK(Put(1, "rocksdb", "rocks"));
ASSERT_EQ("NOT_FOUND", Get(1, "leveldb"));
ASSERT_OK(db_->EndTrace());
ASSERT_OK(Put("hello", "world"));
ASSERT_OK(Merge("foo", "bar"));
std::string value;
std::string dbname2 = test::PerThreadDBPath(env_, "/db_replay");
ASSERT_OK(DestroyDB(dbname2, options));
std::unique_ptr<DB> db2_init;
options.create_if_missing = true;
ASSERT_OK(DB::Open(options, dbname2, &db2_init));
ColumnFamilyHandle* cf;
ASSERT_OK(
db2_init->CreateColumnFamily(ColumnFamilyOptions(), "pikachu", &cf));
delete cf;
db2_init.reset();
std::unique_ptr<DB> db2;
std::vector<ColumnFamilyDescriptor> column_families;
ColumnFamilyOptions cf_options;
cf_options.merge_operator = MergeOperators::CreatePutOperator();
column_families.emplace_back("default", cf_options);
column_families.emplace_back("pikachu", ColumnFamilyOptions());
std::vector<ColumnFamilyHandle*> handles;
DBOptions db_opts;
db_opts.env = env_;
ASSERT_OK(DB::Open(db_opts, dbname2, column_families, &handles, &db2));
env_->SleepForMicroseconds(100);
ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound());
ASSERT_TRUE(db2->Get(ro, handles[0], "g", &value).IsNotFound());
std::unique_ptr<TraceReader> trace_reader;
ASSERT_OK(NewFileTraceReader(env_, env_opts, trace_filename, &trace_reader));
std::unique_ptr<Replayer> replayer;
ASSERT_OK(
db2->NewDefaultReplayer(handles, std::move(trace_reader), &replayer));
TraceExecutionResultHandler res_handler;
std::unique_ptr<TraceRecord> record;
std::unique_ptr<TraceRecordResult> result;
for (int i = 0; i < 2; i++) {
ASSERT_TRUE(replayer->Next(nullptr).IsIncomplete());
ASSERT_OK(replayer->Prepare());
Status s = Status::OK();
while (s.ok()) {
s = replayer->Next(&record);
if (s.IsNotSupported()) {
continue;
}
if (s.ok()) {
ASSERT_OK(replayer->Execute(record, &result));
if (result != nullptr) {
ASSERT_OK(result->Accept(&res_handler));
if (record->GetTraceType() == kTraceIteratorSeek ||
record->GetTraceType() == kTraceIteratorSeekForPrev) {
IteratorSeekQueryTraceRecord* iter_rec =
dynamic_cast<IteratorSeekQueryTraceRecord*>(record.get());
IteratorTraceExecutionResult* iter_res =
dynamic_cast<IteratorTraceExecutionResult*>(result.get());
std::string lower_str = iter_rec->GetLowerBound().ToString();
std::string upper_str = iter_rec->GetUpperBound().ToString();
std::string iter_key = iter_res->GetKey().ToString();
std::string iter_value = iter_res->GetValue().ToString();
if (!lower_str.empty() && !upper_str.empty()) {
ASSERT_EQ(lower_str, "iter-1");
ASSERT_EQ(upper_str, "iter-3");
if (iter_res->GetValid()) {
ASSERT_GE(iter_key, lower_str);
ASSERT_LT(iter_key, upper_str);
} else {
ASSERT_TRUE(iter_key < lower_str || iter_key >= upper_str);
}
}
if (!iter_res->GetValid()) {
ASSERT_TRUE(iter_key.empty());
ASSERT_TRUE(iter_value.empty());
}
}
result.reset();
}
}
}
ASSERT_TRUE(s.IsIncomplete());
ASSERT_TRUE(replayer->Next(nullptr).IsIncomplete());
ASSERT_GE(res_handler.GetAvgLatency(), 0.0);
ASSERT_EQ(res_handler.GetNumWrites(), 9);
ASSERT_EQ(res_handler.GetNumGets(), 3);
ASSERT_EQ(res_handler.GetNumIterSeeks(), 8);
ASSERT_EQ(res_handler.GetNumMultiGets(), 0);
res_handler.Reset();
}
ASSERT_OK(db2->Get(ro, handles[0], "a", &value));
ASSERT_EQ("1", value);
ASSERT_OK(db2->Get(ro, handles[0], "g", &value));
ASSERT_EQ("12", value);
ASSERT_TRUE(db2->Get(ro, handles[0], "hello", &value).IsNotFound());
ASSERT_TRUE(db2->Get(ro, handles[0], "world", &value).IsNotFound());
ASSERT_OK(db2->Get(ro, handles[1], "foo", &value));
ASSERT_EQ("bar", value);
ASSERT_OK(db2->Get(ro, handles[1], "rocksdb", &value));
ASSERT_EQ("rocks", value);
uint64_t fake_ts = 1U;
batch.Clear();
ASSERT_OK(batch.Put("trace-record-write1", "write1"));
ASSERT_OK(batch.Put("trace-record-write2", "write2"));
record.reset(new WriteQueryTraceRecord(batch.Data(), fake_ts++));
ASSERT_OK(replayer->Execute(record, &result));
ASSERT_TRUE(result != nullptr);
ASSERT_OK(result->Accept(&res_handler)); ASSERT_OK(db2->Get(ro, handles[0], "trace-record-write1", &value));
ASSERT_EQ("write1", value);
ASSERT_OK(db2->Get(ro, handles[0], "trace-record-write2", &value));
ASSERT_EQ("write2", value);
ASSERT_GE(res_handler.GetAvgLatency(), 0.0);
ASSERT_EQ(res_handler.GetNumWrites(), 1);
ASSERT_EQ(res_handler.GetNumGets(), 0);
ASSERT_EQ(res_handler.GetNumIterSeeks(), 0);
ASSERT_EQ(res_handler.GetNumMultiGets(), 0);
res_handler.Reset();
record.reset(new GetQueryTraceRecord(handles[0]->GetID(),
"trace-record-write1", fake_ts++));
ASSERT_OK(replayer->Execute(record, &result));
ASSERT_TRUE(result != nullptr);
ASSERT_OK(result->Accept(&res_handler)); record.reset(new GetQueryTraceRecord(handles[0]->GetID(), "trace-record-get",
fake_ts++));
ASSERT_OK(replayer->Execute(record, &result));
ASSERT_TRUE(result != nullptr);
ASSERT_OK(result->Accept(&res_handler)); uint32_t invalid_cf_id = handles[1]->GetID() + 1;
record.reset(new GetQueryTraceRecord(invalid_cf_id, "whatever", fake_ts++));
ASSERT_TRUE(replayer->Execute(record, &result).IsCorruption());
ASSERT_TRUE(result == nullptr);
ASSERT_GE(res_handler.GetAvgLatency(), 0.0);
ASSERT_EQ(res_handler.GetNumWrites(), 0);
ASSERT_EQ(res_handler.GetNumGets(), 2);
ASSERT_EQ(res_handler.GetNumIterSeeks(), 0);
ASSERT_EQ(res_handler.GetNumMultiGets(), 0);
res_handler.Reset();
for (IteratorSeekQueryTraceRecord::SeekType seekType :
{IteratorSeekQueryTraceRecord::kSeek,
IteratorSeekQueryTraceRecord::kSeekForPrev}) {
record.reset(new IteratorSeekQueryTraceRecord(
seekType, handles[0]->GetID(), "trace-record-write1", fake_ts++));
ASSERT_OK(replayer->Execute(record, &result));
ASSERT_TRUE(result != nullptr);
ASSERT_OK(result->Accept(&res_handler)); record.reset(new IteratorSeekQueryTraceRecord(
seekType, handles[0]->GetID(), "trace-record-get", fake_ts++));
ASSERT_OK(replayer->Execute(record, &result));
ASSERT_TRUE(result != nullptr);
ASSERT_OK(result->Accept(&res_handler)); record.reset(new IteratorSeekQueryTraceRecord(seekType, invalid_cf_id,
"whatever", fake_ts++));
ASSERT_TRUE(replayer->Execute(record, &result).IsCorruption());
ASSERT_TRUE(result == nullptr);
}
ASSERT_GE(res_handler.GetAvgLatency(), 0.0);
ASSERT_EQ(res_handler.GetNumWrites(), 0);
ASSERT_EQ(res_handler.GetNumGets(), 0);
ASSERT_EQ(res_handler.GetNumIterSeeks(), 4); ASSERT_EQ(res_handler.GetNumMultiGets(), 0);
res_handler.Reset();
record.reset(new MultiGetQueryTraceRecord(
std::vector<uint32_t>({handles[0]->GetID(), handles[1]->GetID()}),
std::vector<std::string>({"a", "foo"}), fake_ts++));
ASSERT_OK(replayer->Execute(record, &result));
ASSERT_TRUE(result != nullptr);
ASSERT_OK(result->Accept(&res_handler)); record.reset(new MultiGetQueryTraceRecord(
std::vector<uint32_t>({handles[0]->GetID(), handles[1]->GetID()}),
std::vector<std::string>({"no1", "no2"}), fake_ts++));
ASSERT_OK(replayer->Execute(record, &result));
ASSERT_TRUE(result != nullptr);
ASSERT_OK(result->Accept(&res_handler)); record.reset(new MultiGetQueryTraceRecord(
std::vector<uint32_t>({handles[0]->GetID(), handles[1]->GetID()}),
std::vector<std::string>({"a", "no2"}), fake_ts++));
ASSERT_OK(replayer->Execute(record, &result));
ASSERT_TRUE(result != nullptr);
MultiValuesTraceExecutionResult* mvr =
dynamic_cast<MultiValuesTraceExecutionResult*>(result.get());
ASSERT_TRUE(mvr != nullptr);
ASSERT_OK(mvr->GetMultiStatus()[0]);
ASSERT_TRUE(mvr->GetMultiStatus()[1].IsNotFound());
ASSERT_EQ(mvr->GetValues()[0], "1");
ASSERT_EQ(mvr->GetValues()[1], "");
ASSERT_OK(result->Accept(&res_handler)); record.reset(new MultiGetQueryTraceRecord(
std::vector<uint32_t>(
{handles[0]->GetID(), handles[1]->GetID(), invalid_cf_id}),
std::vector<std::string>({"a", "foo", "whatever"}), fake_ts++));
ASSERT_TRUE(replayer->Execute(record, &result).IsCorruption());
ASSERT_TRUE(result == nullptr);
record.reset(new MultiGetQueryTraceRecord(
std::vector<uint32_t>(), std::vector<std::string>(), fake_ts++));
ASSERT_TRUE(replayer->Execute(record, &result).IsInvalidArgument());
ASSERT_TRUE(result == nullptr);
record.reset(new MultiGetQueryTraceRecord(
std::vector<uint32_t>({handles[0]->GetID(), handles[1]->GetID()}),
std::vector<std::string>({"a"}), fake_ts++));
ASSERT_TRUE(replayer->Execute(record, &result).IsInvalidArgument());
ASSERT_TRUE(result == nullptr);
ASSERT_GE(res_handler.GetAvgLatency(), 0.0);
ASSERT_EQ(res_handler.GetNumWrites(), 0);
ASSERT_EQ(res_handler.GetNumGets(), 0);
ASSERT_EQ(res_handler.GetNumIterSeeks(), 0);
ASSERT_EQ(res_handler.GetNumMultiGets(), 3);
res_handler.Reset();
replayer.reset();
for (auto handle : handles) {
delete handle;
}
db2.reset();
ASSERT_OK(DestroyDB(dbname2, options));
}
TEST_F(DBTest2, TraceWithLimit) {
Options options = CurrentOptions();
options.merge_operator = MergeOperators::CreatePutOperator();
ReadOptions ro;
WriteOptions wo;
TraceOptions trace_opts;
EnvOptions env_opts;
CreateAndReopenWithCF({"pikachu"}, options);
Random rnd(301);
trace_opts.max_trace_file_size = 5;
std::string trace_filename = dbname_ + "/rocksdb.trace1";
std::unique_ptr<TraceWriter> trace_writer;
ASSERT_OK(NewFileTraceWriter(env_, env_opts, trace_filename, &trace_writer));
ASSERT_OK(db_->StartTrace(trace_opts, std::move(trace_writer)));
ASSERT_OK(Put(0, "a", "1"));
ASSERT_OK(Put(0, "b", "1"));
ASSERT_OK(Put(0, "c", "1"));
ASSERT_OK(db_->EndTrace());
std::string dbname2 = test::PerThreadDBPath(env_, "/db_replay2");
std::string value;
ASSERT_OK(DestroyDB(dbname2, options));
std::unique_ptr<DB> db2_init;
options.create_if_missing = true;
ASSERT_OK(DB::Open(options, dbname2, &db2_init));
ColumnFamilyHandle* cf;
ASSERT_OK(
db2_init->CreateColumnFamily(ColumnFamilyOptions(), "pikachu", &cf));
delete cf;
db2_init.reset();
std::unique_ptr<DB> db2;
std::vector<ColumnFamilyDescriptor> column_families;
ColumnFamilyOptions cf_options;
cf_options.merge_operator = MergeOperators::CreatePutOperator();
column_families.emplace_back("default", cf_options);
column_families.emplace_back("pikachu", ColumnFamilyOptions());
std::vector<ColumnFamilyHandle*> handles;
DBOptions db_opts;
db_opts.env = env_;
ASSERT_OK(DB::Open(db_opts, dbname2, column_families, &handles, &db2));
env_->SleepForMicroseconds(100);
ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound());
ASSERT_TRUE(db2->Get(ro, handles[0], "b", &value).IsNotFound());
ASSERT_TRUE(db2->Get(ro, handles[0], "c", &value).IsNotFound());
std::unique_ptr<TraceReader> trace_reader;
ASSERT_OK(NewFileTraceReader(env_, env_opts, trace_filename, &trace_reader));
std::unique_ptr<Replayer> replayer;
ASSERT_OK(
db2->NewDefaultReplayer(handles, std::move(trace_reader), &replayer));
ASSERT_OK(replayer->Prepare());
ASSERT_OK(replayer->Replay(ReplayOptions(), nullptr));
replayer.reset();
ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound());
ASSERT_TRUE(db2->Get(ro, handles[0], "b", &value).IsNotFound());
ASSERT_TRUE(db2->Get(ro, handles[0], "c", &value).IsNotFound());
for (auto handle : handles) {
delete handle;
}
db2.reset();
ASSERT_OK(DestroyDB(dbname2, options));
}
TEST_F(DBTest2, TraceWithSampling) {
Options options = CurrentOptions();
ReadOptions ro;
WriteOptions wo;
TraceOptions trace_opts;
EnvOptions env_opts;
CreateAndReopenWithCF({"pikachu"}, options);
Random rnd(301);
trace_opts.sampling_frequency = 2;
std::string trace_filename = dbname_ + "/rocksdb.trace_sampling";
std::unique_ptr<TraceWriter> trace_writer;
ASSERT_OK(NewFileTraceWriter(env_, env_opts, trace_filename, &trace_writer));
ASSERT_OK(db_->StartTrace(trace_opts, std::move(trace_writer)));
ASSERT_OK(Put(0, "a", "1"));
ASSERT_OK(Put(0, "b", "2"));
ASSERT_OK(Put(0, "c", "3"));
ASSERT_OK(Put(0, "d", "4"));
ASSERT_OK(Put(0, "e", "5"));
ASSERT_OK(db_->EndTrace());
std::string dbname2 = test::PerThreadDBPath(env_, "/db_replay_sampling");
std::string value;
ASSERT_OK(DestroyDB(dbname2, options));
std::unique_ptr<DB> db2_init;
options.create_if_missing = true;
ASSERT_OK(DB::Open(options, dbname2, &db2_init));
ColumnFamilyHandle* cf;
ASSERT_OK(
db2_init->CreateColumnFamily(ColumnFamilyOptions(), "pikachu", &cf));
delete cf;
db2_init.reset();
std::unique_ptr<DB> db2;
std::vector<ColumnFamilyDescriptor> column_families;
ColumnFamilyOptions cf_options;
column_families.emplace_back("default", cf_options);
column_families.emplace_back("pikachu", ColumnFamilyOptions());
std::vector<ColumnFamilyHandle*> handles;
DBOptions db_opts;
db_opts.env = env_;
ASSERT_OK(DB::Open(db_opts, dbname2, column_families, &handles, &db2));
env_->SleepForMicroseconds(100);
ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound());
ASSERT_TRUE(db2->Get(ro, handles[0], "b", &value).IsNotFound());
ASSERT_TRUE(db2->Get(ro, handles[0], "c", &value).IsNotFound());
ASSERT_TRUE(db2->Get(ro, handles[0], "d", &value).IsNotFound());
ASSERT_TRUE(db2->Get(ro, handles[0], "e", &value).IsNotFound());
std::unique_ptr<TraceReader> trace_reader;
ASSERT_OK(NewFileTraceReader(env_, env_opts, trace_filename, &trace_reader));
std::unique_ptr<Replayer> replayer;
ASSERT_OK(
db2->NewDefaultReplayer(handles, std::move(trace_reader), &replayer));
ASSERT_OK(replayer->Prepare());
ASSERT_OK(replayer->Replay(ReplayOptions(), nullptr));
replayer.reset();
ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound());
ASSERT_FALSE(db2->Get(ro, handles[0], "b", &value).IsNotFound());
ASSERT_TRUE(db2->Get(ro, handles[0], "c", &value).IsNotFound());
ASSERT_FALSE(db2->Get(ro, handles[0], "d", &value).IsNotFound());
ASSERT_TRUE(db2->Get(ro, handles[0], "e", &value).IsNotFound());
for (auto handle : handles) {
delete handle;
}
db2.reset();
ASSERT_OK(DestroyDB(dbname2, options));
}
TEST_F(DBTest2, TraceWithFilter) {
Options options = CurrentOptions();
options.merge_operator = MergeOperators::CreatePutOperator();
ReadOptions ro;
WriteOptions wo;
TraceOptions trace_opts;
EnvOptions env_opts;
CreateAndReopenWithCF({"pikachu"}, options);
Random rnd(301);
Iterator* single_iter = nullptr;
trace_opts.filter = TraceFilterType::kTraceFilterWrite;
std::string trace_filename = dbname_ + "/rocksdb.trace";
std::unique_ptr<TraceWriter> trace_writer;
ASSERT_OK(NewFileTraceWriter(env_, env_opts, trace_filename, &trace_writer));
ASSERT_OK(db_->StartTrace(trace_opts, std::move(trace_writer)));
ASSERT_OK(Put(0, "a", "1"));
ASSERT_OK(Merge(0, "b", "2"));
ASSERT_OK(Delete(0, "c"));
ASSERT_OK(SingleDelete(0, "d"));
ASSERT_OK(db_->DeleteRange(wo, dbfull()->DefaultColumnFamily(), "e", "f"));
WriteBatch batch;
ASSERT_OK(batch.Put("f", "11"));
ASSERT_OK(batch.Merge("g", "12"));
ASSERT_OK(batch.Delete("h"));
ASSERT_OK(batch.SingleDelete("i"));
ASSERT_OK(batch.DeleteRange("j", "k"));
ASSERT_OK(db_->Write(wo, &batch));
single_iter = db_->NewIterator(ro);
single_iter->Seek("f");
single_iter->SeekForPrev("g");
delete single_iter;
ASSERT_EQ("1", Get(0, "a"));
ASSERT_EQ("12", Get(0, "g"));
ASSERT_OK(Put(1, "foo", "bar"));
ASSERT_OK(Put(1, "rocksdb", "rocks"));
ASSERT_EQ("NOT_FOUND", Get(1, "leveldb"));
ASSERT_OK(db_->EndTrace());
ASSERT_OK(Put("hello", "world"));
ASSERT_OK(Merge("foo", "bar"));
std::string value;
std::string dbname2 = test::PerThreadDBPath(env_, "db_replay");
ASSERT_OK(DestroyDB(dbname2, options));
std::unique_ptr<DB> db2_init;
options.create_if_missing = true;
ASSERT_OK(DB::Open(options, dbname2, &db2_init));
ColumnFamilyHandle* cf;
ASSERT_OK(
db2_init->CreateColumnFamily(ColumnFamilyOptions(), "pikachu", &cf));
delete cf;
db2_init.reset();
std::unique_ptr<DB> db2;
std::vector<ColumnFamilyDescriptor> column_families;
ColumnFamilyOptions cf_options;
cf_options.merge_operator = MergeOperators::CreatePutOperator();
column_families.emplace_back("default", cf_options);
column_families.emplace_back("pikachu", ColumnFamilyOptions());
std::vector<ColumnFamilyHandle*> handles;
DBOptions db_opts;
db_opts.env = env_;
ASSERT_OK(DB::Open(db_opts, dbname2, column_families, &handles, &db2));
env_->SleepForMicroseconds(100);
ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound());
ASSERT_TRUE(db2->Get(ro, handles[0], "g", &value).IsNotFound());
std::unique_ptr<TraceReader> trace_reader;
ASSERT_OK(NewFileTraceReader(env_, env_opts, trace_filename, &trace_reader));
std::unique_ptr<Replayer> replayer;
ASSERT_OK(
db2->NewDefaultReplayer(handles, std::move(trace_reader), &replayer));
ASSERT_OK(replayer->Prepare());
ASSERT_OK(replayer->Replay(ReplayOptions(), nullptr));
replayer.reset();
ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound());
ASSERT_TRUE(db2->Get(ro, handles[0], "g", &value).IsNotFound());
ASSERT_TRUE(db2->Get(ro, handles[0], "hello", &value).IsNotFound());
ASSERT_TRUE(db2->Get(ro, handles[0], "world", &value).IsNotFound());
ASSERT_TRUE(db2->Get(ro, handles[0], "foo", &value).IsNotFound());
ASSERT_TRUE(db2->Get(ro, handles[0], "rocksdb", &value).IsNotFound());
for (auto handle : handles) {
delete handle;
}
db2.reset();
ASSERT_OK(DestroyDB(dbname2, options));
std::string dbname3 = test::PerThreadDBPath(env_, "db_not_trace_read");
ASSERT_OK(DestroyDB(dbname3, options));
std::unique_ptr<DB> db3_init;
options.create_if_missing = true;
ColumnFamilyHandle* cf3;
ASSERT_OK(DB::Open(options, dbname3, &db3_init));
ASSERT_OK(
db3_init->CreateColumnFamily(ColumnFamilyOptions(), "pikachu", &cf3));
delete cf3;
db3_init.reset();
column_families.clear();
column_families.emplace_back("default", cf_options);
column_families.emplace_back("pikachu", ColumnFamilyOptions());
handles.clear();
std::unique_ptr<DB> db3;
ASSERT_OK(DB::Open(db_opts, dbname3, column_families, &handles, &db3));
env_->SleepForMicroseconds(100);
ASSERT_TRUE(db3->Get(ro, handles[0], "a", &value).IsNotFound());
ASSERT_TRUE(db3->Get(ro, handles[0], "g", &value).IsNotFound());
trace_opts.filter = TraceFilterType::kTraceFilterGet;
std::string trace_filename3 = dbname_ + "/rocksdb.trace_3";
std::unique_ptr<TraceWriter> trace_writer3;
ASSERT_OK(
NewFileTraceWriter(env_, env_opts, trace_filename3, &trace_writer3));
ASSERT_OK(db3->StartTrace(trace_opts, std::move(trace_writer3)));
ASSERT_OK(db3->Put(wo, handles[0], "a", "1"));
ASSERT_OK(db3->Merge(wo, handles[0], "b", "2"));
ASSERT_OK(db3->Delete(wo, handles[0], "c"));
ASSERT_OK(db3->SingleDelete(wo, handles[0], "d"));
ASSERT_OK(db3->Get(ro, handles[0], "a", &value));
ASSERT_EQ(value, "1");
ASSERT_TRUE(db3->Get(ro, handles[0], "c", &value).IsNotFound());
ASSERT_OK(db3->EndTrace());
for (auto handle : handles) {
delete handle;
}
db3.reset();
ASSERT_OK(DestroyDB(dbname3, options));
std::unique_ptr<TraceReader> trace_reader3;
ASSERT_OK(
NewFileTraceReader(env_, env_opts, trace_filename3, &trace_reader3));
int count = 0;
std::string data;
Status s;
while (true) {
s = trace_reader3->Read(&data);
if (!s.ok()) {
break;
}
count += 1;
}
ASSERT_EQ(count, 6);
}
TEST_F(DBTest2, PinnableSliceAndMmapReads) {
Options options = CurrentOptions();
options.env = env_;
if (!IsMemoryMappedAccessSupported()) {
ROCKSDB_GTEST_SKIP("Test requires default environment");
return;
}
options.allow_mmap_reads = true;
options.max_open_files = 100;
options.compression = kNoCompression;
Reopen(options);
ASSERT_OK(Put("foo", "bar"));
ASSERT_OK(Flush());
PinnableSlice pinned_value;
ASSERT_EQ(Get("foo", &pinned_value), Status::OK());
ASSERT_FALSE(pinned_value.IsPinned());
ASSERT_EQ(pinned_value.ToString(), "bar");
ASSERT_OK(dbfull()->TEST_CompactRange(
0 , nullptr , nullptr ,
nullptr , true ));
ASSERT_EQ(pinned_value.ToString(), "bar");
pinned_value.Reset();
Close();
ASSERT_OK(ReadOnlyReopen(options));
ASSERT_EQ(Get("foo", &pinned_value), Status::OK());
ASSERT_FALSE(pinned_value.IsPinned());
ASSERT_EQ(pinned_value.ToString(), "bar");
pinned_value.Reset();
Close();
options.max_open_files = -1;
ASSERT_OK(ReadOnlyReopen(options));
ASSERT_EQ(Get("foo", &pinned_value), Status::OK());
ASSERT_TRUE(pinned_value.IsPinned());
ASSERT_EQ(pinned_value.ToString(), "bar");
}
TEST_F(DBTest2, DISABLED_IteratorPinnedMemory) {
Options options = CurrentOptions();
options.create_if_missing = true;
options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
BlockBasedTableOptions bbto;
bbto.no_block_cache = false;
bbto.cache_index_and_filter_blocks = false;
bbto.block_cache = NewLRUCache(100000);
bbto.block_size = 400; options.table_factory.reset(NewBlockBasedTableFactory(bbto));
Reopen(options);
Random rnd(301);
std::string v = rnd.RandomString(400);
ASSERT_OK(Put("1", v));
ASSERT_OK(Put("3", v));
ASSERT_OK(Put("5", v));
ASSERT_OK(Put("7", v));
ASSERT_OK(Flush());
ASSERT_EQ(0, bbto.block_cache->GetPinnedUsage());
{
std::unique_ptr<Iterator> iter(db_->NewIterator(ReadOptions()));
iter->SeekToFirst();
for (int i = 0; i < 4; i++) {
ASSERT_TRUE(iter->Valid());
ASSERT_GT(bbto.block_cache->GetPinnedUsage(), 0);
ASSERT_LT(bbto.block_cache->GetPinnedUsage(), 800);
iter->Next();
}
ASSERT_FALSE(iter->Valid());
iter->Seek("4");
ASSERT_TRUE(iter->Valid());
ASSERT_GT(bbto.block_cache->GetPinnedUsage(), 0);
ASSERT_LT(bbto.block_cache->GetPinnedUsage(), 800);
iter->Seek("3");
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_GT(bbto.block_cache->GetPinnedUsage(), 0);
ASSERT_LT(bbto.block_cache->GetPinnedUsage(), 800);
}
ASSERT_EQ(0, bbto.block_cache->GetPinnedUsage());
ASSERT_OK(Put("2", v));
ASSERT_OK(Put("5", v));
ASSERT_OK(Put("6", v));
ASSERT_OK(Put("8", v));
ASSERT_OK(Flush());
bbto.block_cache->SetCapacity(0);
bbto.block_cache->SetCapacity(100000);
std::atomic<bool> finished(false);
std::atomic<int> block_newed(0);
std::atomic<int> block_destroyed(0);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"Block::Block:0", [&](void* ) {
if (finished) {
return;
}
EXPECT_GE(block_newed.load(), block_destroyed.load());
EXPECT_LE(block_newed.load(), block_destroyed.load() + 1);
block_newed.fetch_add(1);
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"Block::~Block", [&](void* ) {
if (finished) {
return;
}
EXPECT_GE(block_newed.load(), block_destroyed.load() + 1);
EXPECT_LE(block_newed.load(), block_destroyed.load() + 2);
block_destroyed.fetch_add(1);
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"CompactionJob::Run:BeforeVerify",
[&](void* ) { finished = true; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_EQ(8, block_newed.load());
ASSERT_EQ(8, block_destroyed.load());
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(DBTest2, TestGetColumnFamilyHandleUnlocked) {
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
{"TestGetColumnFamilyHandleUnlocked::GetColumnFamilyHandleUnlocked1",
"TestGetColumnFamilyHandleUnlocked::PreGetColumnFamilyHandleUnlocked2"},
{"TestGetColumnFamilyHandleUnlocked::GetColumnFamilyHandleUnlocked2",
"TestGetColumnFamilyHandleUnlocked::ReadColumnFamilyHandle1"},
});
SyncPoint::GetInstance()->EnableProcessing();
CreateColumnFamilies({"test1", "test2"}, Options());
ASSERT_EQ(handles_.size(), 2);
DBImpl* dbi = dbfull();
port::Thread user_thread1([&]() {
auto cfh = dbi->GetColumnFamilyHandleUnlocked(handles_[0]->GetID());
ASSERT_EQ(cfh->GetID(), handles_[0]->GetID());
TEST_SYNC_POINT(
"TestGetColumnFamilyHandleUnlocked::GetColumnFamilyHandleUnlocked1");
TEST_SYNC_POINT(
"TestGetColumnFamilyHandleUnlocked::ReadColumnFamilyHandle1");
ASSERT_EQ(cfh->GetID(), handles_[0]->GetID());
});
port::Thread user_thread2([&]() {
TEST_SYNC_POINT(
"TestGetColumnFamilyHandleUnlocked::PreGetColumnFamilyHandleUnlocked2");
auto cfh = dbi->GetColumnFamilyHandleUnlocked(handles_[1]->GetID());
ASSERT_EQ(cfh->GetID(), handles_[1]->GetID());
TEST_SYNC_POINT(
"TestGetColumnFamilyHandleUnlocked::GetColumnFamilyHandleUnlocked2");
ASSERT_EQ(cfh->GetID(), handles_[1]->GetID());
});
user_thread1.join();
user_thread2.join();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
}
TEST_F(DBTest2, TestCompactFiles) {
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
{"TestCompactFiles::IngestExternalFile1",
"TestCompactFiles::IngestExternalFile2"},
});
SyncPoint::GetInstance()->EnableProcessing();
Options options;
options.env = env_;
options.num_levels = 2;
options.disable_auto_compactions = true;
Reopen(options);
auto* handle = db_->DefaultColumnFamily();
ASSERT_EQ(db_->NumberLevels(handle), 2);
ROCKSDB_NAMESPACE::SstFileWriter sst_file_writer{
ROCKSDB_NAMESPACE::EnvOptions(), options};
std::string external_file1 = dbname_ + "/test_compact_files1.sst_t";
std::string external_file2 = dbname_ + "/test_compact_files2.sst_t";
std::string external_file3 = dbname_ + "/test_compact_files3.sst_t";
ASSERT_OK(sst_file_writer.Open(external_file1));
ASSERT_OK(sst_file_writer.Put("1", "1"));
ASSERT_OK(sst_file_writer.Put("2", "2"));
ASSERT_OK(sst_file_writer.Finish());
ASSERT_OK(sst_file_writer.Open(external_file2));
ASSERT_OK(sst_file_writer.Put("3", "3"));
ASSERT_OK(sst_file_writer.Put("4", "4"));
ASSERT_OK(sst_file_writer.Finish());
ASSERT_OK(sst_file_writer.Open(external_file3));
ASSERT_OK(sst_file_writer.Put("5", "5"));
ASSERT_OK(sst_file_writer.Put("6", "6"));
ASSERT_OK(sst_file_writer.Finish());
ASSERT_OK(db_->IngestExternalFile(handle, {external_file1, external_file3},
IngestExternalFileOptions()));
ASSERT_EQ(NumTableFilesAtLevel(1, 0), 2);
std::vector<std::string> files;
GetSstFiles(env_, dbname_, &files);
ASSERT_EQ(files.size(), 2);
Status user_thread1_status;
port::Thread user_thread1([&]() {
user_thread1_status =
db_->CompactFiles(CompactionOptions(), handle, files, 1);
});
Status user_thread2_status;
port::Thread user_thread2([&]() {
user_thread2_status = db_->IngestExternalFile(handle, {external_file2},
IngestExternalFileOptions());
TEST_SYNC_POINT("TestCompactFiles::IngestExternalFile1");
});
user_thread1.join();
user_thread2.join();
ASSERT_OK(user_thread1_status);
ASSERT_OK(user_thread2_status);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
}
TEST_F(DBTest2, TestCancelCompactFiles) {
SyncPoint::GetInstance()->EnableProcessing();
Options options;
options.env = env_;
options.num_levels = 2;
options.disable_auto_compactions = true;
Reopen(options);
auto* handle = db_->DefaultColumnFamily();
ASSERT_EQ(db_->NumberLevels(handle), 2);
ROCKSDB_NAMESPACE::SstFileWriter sst_file_writer{
ROCKSDB_NAMESPACE::EnvOptions(), options};
std::vector<std::string> external_sst_file_names;
int key_counter = 0;
const int num_keys_per_file = 100000;
const int num_files = 10;
for (int i = 0; i < num_files; ++i) {
std::string file_name =
dbname_ + "/test_compact_files" + std::to_string(i) + ".sst_t";
external_sst_file_names.push_back(file_name);
ASSERT_OK(sst_file_writer.Open(file_name));
for (int j = 0; j < num_keys_per_file; ++j) {
ASSERT_OK(sst_file_writer.Put(Key(j + num_keys_per_file * key_counter),
std::to_string(j)));
}
key_counter += 1;
ASSERT_OK(sst_file_writer.Finish());
}
ASSERT_OK(db_->IngestExternalFile(handle, external_sst_file_names,
IngestExternalFileOptions()));
ASSERT_EQ(NumTableFilesAtLevel(1, 0), num_files);
std::vector<std::string> files;
GetSstFiles(env_, dbname_, &files);
ASSERT_EQ(files.size(), num_files);
CompactionOptions compaction_options;
std::atomic<bool> canceled(true);
compaction_options.canceled = &canceled;
ASSERT_TRUE(db_->CompactFiles(compaction_options, handle, files, 1)
.IsManualCompactionPaused());
ASSERT_EQ(NumTableFilesAtLevel(1, 0), num_files);
bool disable_compaction = false;
compaction_options.canceled->store(false, std::memory_order_release);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"TestCancelCompactFiles:SuccessfulCompaction", [&](void* arg) {
auto paused = static_cast<std::atomic<int>*>(arg);
if (disable_compaction) {
db_->DisableManualCompaction();
ASSERT_EQ(1, paused->load(std::memory_order_acquire));
} else {
compaction_options.canceled->store(true, std::memory_order_release);
ASSERT_EQ(0, paused->load(std::memory_order_acquire));
}
});
ASSERT_TRUE(db_->CompactFiles(compaction_options, handle, files, 1)
.IsManualCompactionPaused());
ASSERT_EQ(NumTableFilesAtLevel(1, 0), num_files);
disable_compaction = true;
compaction_options.canceled->store(false, std::memory_order_release);
ASSERT_TRUE(db_->CompactFiles(compaction_options, handle, files, 1)
.IsManualCompactionPaused());
ASSERT_EQ(NumTableFilesAtLevel(1, 0), num_files);
ASSERT_FALSE(compaction_options.canceled->load());
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
db_->EnableManualCompaction();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"CompactFilesImpl:0", [&](void* ) {
compaction_options.canceled->store(true, std::memory_order_release);
});
compaction_options.canceled->store(false, std::memory_order_release);
ASSERT_OK(db_->CompactFiles(compaction_options, handle, files, 1));
ASSERT_EQ(NumTableFilesAtLevel(1, 0), 1);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
}
TEST_F(DBTest2, MultiDBParallelOpenTest) {
const int kNumDbs = 2;
Options options = CurrentOptions();
std::vector<std::string> dbnames;
for (int i = 0; i < kNumDbs; ++i) {
dbnames.emplace_back(test::PerThreadDBPath(env_, "db" + std::to_string(i)));
ASSERT_OK(DestroyDB(dbnames.back(), options));
}
std::vector<std::thread> open_threads;
std::vector<std::unique_ptr<DB>> dbs(kNumDbs);
options.create_if_missing = true;
for (int i = 0; i < kNumDbs; ++i) {
open_threads.emplace_back(
[&](int dbnum) {
ASSERT_OK(DB::Open(options, dbnames[dbnum], &dbs[dbnum]));
},
i);
}
for (int i = 0; i < kNumDbs; ++i) {
open_threads[i].join();
ASSERT_OK(dbs[i]->Put(WriteOptions(), "xi", "gua"));
dbs[i].reset();
}
open_threads.clear();
for (int i = 0; i < kNumDbs; ++i) {
open_threads.emplace_back(
[&](int dbnum) {
ASSERT_OK(DB::Open(options, dbnames[dbnum], &dbs[dbnum]));
},
i);
}
for (int i = 0; i < kNumDbs; ++i) {
open_threads[i].join();
dbs[i].reset();
ASSERT_OK(DestroyDB(dbnames[i], options));
}
}
namespace {
class DummyOldStats : public Statistics {
public:
const char* Name() const override { return "DummyOldStats"; }
uint64_t getTickerCount(uint32_t ) const override { return 0; }
void recordTick(uint32_t , uint64_t ) override {
num_rt++;
}
void setTickerCount(uint32_t , uint64_t ) override {}
uint64_t getAndResetTickerCount(uint32_t ) override {
return 0;
}
void measureTime(uint32_t , uint64_t ) override {
num_mt++;
}
void histogramData(
uint32_t ,
ROCKSDB_NAMESPACE::HistogramData* const ) const override {}
std::string getHistogramString(uint32_t ) const override {
return "";
}
bool HistEnabledForType(uint32_t ) const override { return false; }
std::string ToString() const override { return ""; }
std::atomic<int> num_rt{0};
std::atomic<int> num_mt{0};
};
}
TEST_F(DBTest2, OldStatsInterface) {
DummyOldStats* dos = new DummyOldStats();
std::shared_ptr<Statistics> stats(dos);
Options options = CurrentOptions();
options.create_if_missing = true;
options.statistics = stats;
Reopen(options);
ASSERT_OK(Put("foo", "bar"));
ASSERT_EQ("bar", Get("foo"));
ASSERT_OK(Flush());
ASSERT_EQ("bar", Get("foo"));
ASSERT_GT(dos->num_rt, 0);
ASSERT_GT(dos->num_mt, 0);
}
TEST_F(DBTest2, CloseWithUnreleasedSnapshot) {
const Snapshot* ss = db_->GetSnapshot();
for (auto h : handles_) {
db_->DestroyColumnFamilyHandle(h);
}
handles_.clear();
ASSERT_NOK(db_->Close());
db_->ReleaseSnapshot(ss);
ASSERT_OK(db_->Close());
db_.reset();
}
TEST_F(DBTest2, PrefixBloomReseek) {
Options options = CurrentOptions();
options.create_if_missing = true;
options.prefix_extractor.reset(NewCappedPrefixTransform(3));
BlockBasedTableOptions bbto;
bbto.filter_policy.reset(NewBloomFilterPolicy(10, false));
bbto.whole_key_filtering = false;
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
DestroyAndReopen(options);
ASSERT_OK(Put("aaa1", ""));
ASSERT_OK(Put("ccc1", ""));
ASSERT_OK(Flush());
ASSERT_OK(Put("ddd0", ""));
ASSERT_OK(Flush());
CompactRangeOptions cro;
cro.bottommost_level_compaction = BottommostLevelCompaction::kSkip;
ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
ASSERT_OK(Put("bbb1", ""));
Iterator* iter = db_->NewIterator(ReadOptions());
ASSERT_OK(iter->status());
iter->Seek("bbb1");
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("bbb1", iter->key().ToString());
iter->Seek("ccc1");
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("ccc1", iter->key().ToString());
delete iter;
}
TEST_F(DBTest2, PrefixBloomFilteredOut) {
Options options = CurrentOptions();
options.create_if_missing = true;
options.prefix_extractor.reset(NewCappedPrefixTransform(3));
BlockBasedTableOptions bbto;
bbto.filter_policy.reset(NewBloomFilterPolicy(10, false));
bbto.whole_key_filtering = false;
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
for (bool opt_in : {false, true}) {
options.prefix_seek_opt_in_only = opt_in;
DestroyAndReopen(options);
ASSERT_OK(Put("aaa1", ""));
ASSERT_OK(Put("ccc1", ""));
ASSERT_OK(Flush());
ASSERT_OK(Put("ddd0", ""));
ASSERT_OK(Flush());
CompactRangeOptions cro;
cro.bottommost_level_compaction = BottommostLevelCompaction::kSkip;
ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
ReadOptions ropts;
for (bool same : {false, true}) {
ropts.prefix_same_as_start = same;
std::unique_ptr<Iterator> iter(db_->NewIterator(ropts));
ASSERT_OK(iter->status());
iter->Seek("bbb1");
ASSERT_OK(iter->status());
if (opt_in && !same) {
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key(), "ccc1");
} else {
ASSERT_FALSE(iter->Valid());
}
}
}
}
TEST_F(DBTest2, RowCacheSnapshot) {
Options options = CurrentOptions();
options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
options.row_cache = NewLRUCache(8 * 8192);
DestroyAndReopen(options);
ASSERT_OK(Put("foo", "bar1"));
const Snapshot* s1 = db_->GetSnapshot();
ASSERT_OK(Put("foo", "bar2"));
ASSERT_OK(Flush());
ASSERT_OK(Put("foo2", "bar"));
const Snapshot* s2 = db_->GetSnapshot();
ASSERT_OK(Put("foo3", "bar"));
const Snapshot* s3 = db_->GetSnapshot();
ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_HIT), 0);
ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_MISS), 0);
ASSERT_EQ(Get("foo"), "bar2");
ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_HIT), 0);
ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_MISS), 1);
ASSERT_EQ(Get("foo"), "bar2");
ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_HIT), 1);
ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_MISS), 1);
ASSERT_EQ(Get("foo", s1), "bar1");
ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_HIT), 1);
ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_MISS), 2);
ASSERT_EQ(Get("foo", s2), "bar2");
ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_HIT), 2);
ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_MISS), 2);
ASSERT_EQ(Get("foo", s1), "bar1");
ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_HIT), 3);
ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_MISS), 2);
ASSERT_EQ(Get("foo", s3), "bar2");
ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_HIT), 4);
ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_MISS), 2);
db_->ReleaseSnapshot(s1);
db_->ReleaseSnapshot(s2);
db_->ReleaseSnapshot(s3);
}
TEST_F(DBTest2, CrashInRecoveryMultipleCF) {
const std::vector<std::string> sync_points = {
"DBImpl::RecoverLogFiles:BeforeFlushFinalMemtable",
"VersionSet::ProcessManifestWrites:BeforeWriteLastVersionEdit:0"};
for (const auto& test_sync_point : sync_points) {
Options options = CurrentOptions();
DestroyAndReopen(options);
options.create_if_missing = true;
options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
CreateAndReopenWithCF({"pikachu"}, options);
ASSERT_OK(Put("foo", "bar"));
ASSERT_OK(Flush());
ASSERT_OK(Put(1, "foo", "bar"));
ASSERT_OK(Flush(1));
ASSERT_OK(Put("foo", "bar"));
ASSERT_OK(Put(1, "foo", "bar"));
std::string large_value(400, ' ');
ASSERT_OK(Put("foo1", large_value));
ASSERT_OK(Put("foo2", large_value));
Close();
std::vector<std::string> filenames;
ASSERT_OK(env_->GetChildren(dbname_, &filenames));
for (const auto& f : filenames) {
uint64_t number;
FileType type;
if (ParseFileName(f, &number, &type) && type == FileType::kWalFile) {
std::string fname = dbname_ + "/" + f;
std::string file_content;
ASSERT_OK(ReadFileToString(env_, fname, &file_content));
file_content[400] = 'h';
file_content[401] = 'a';
ASSERT_OK(WriteStringToFile(env_, file_content, fname, false));
break;
}
}
FaultInjectionTestEnv fit_env(options.env);
options.env = &fit_env;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
test_sync_point,
[&](void* ) { fit_env.SetFilesystemActive(false); });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
ASSERT_NOK(TryReopenWithColumnFamilies(
{kDefaultColumnFamilyName, "pikachu"}, options));
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
fit_env.SetFilesystemActive(true);
options.env = env_;
ASSERT_OK(TryReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"},
options));
}
}
TEST_F(DBTest2, SeekFileRangeDeleteTail) {
Options options = CurrentOptions();
options.prefix_extractor.reset(NewCappedPrefixTransform(1));
options.num_levels = 3;
DestroyAndReopen(options);
ASSERT_OK(Put("a", "a"));
const Snapshot* s1 = db_->GetSnapshot();
ASSERT_OK(
db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "a", "f"));
ASSERT_OK(Put("b", "a"));
ASSERT_OK(Flush());
ASSERT_OK(Put("x", "a"));
ASSERT_OK(Put("z", "a"));
ASSERT_OK(Flush());
CompactRangeOptions cro;
cro.change_level = true;
cro.target_level = 2;
ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
{
ReadOptions ro;
ro.total_order_seek = true;
std::unique_ptr<Iterator> iter(db_->NewIterator(ro));
ASSERT_OK(iter->status());
iter->Seek("e");
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("x", iter->key().ToString());
}
db_->ReleaseSnapshot(s1);
}
TEST_F(DBTest2, BackgroundPurgeTest) {
Options options = CurrentOptions();
options.write_buffer_manager =
std::make_shared<ROCKSDB_NAMESPACE::WriteBufferManager>(1 << 20);
options.avoid_unnecessary_blocking_io = true;
DestroyAndReopen(options);
size_t base_value = options.write_buffer_manager->memory_usage();
ASSERT_OK(Put("a", "a"));
Iterator* iter = db_->NewIterator(ReadOptions());
ASSERT_OK(iter->status());
ASSERT_OK(Flush());
size_t value = options.write_buffer_manager->memory_usage();
ASSERT_GT(value, base_value);
db_->GetEnv()->SetBackgroundThreads(1, Env::Priority::HIGH);
test::SleepingBackgroundTask sleeping_task_after;
db_->GetEnv()->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
&sleeping_task_after, Env::Priority::HIGH);
delete iter;
Env::Default()->SleepForMicroseconds(100000);
value = options.write_buffer_manager->memory_usage();
ASSERT_GT(value, base_value);
sleeping_task_after.WakeUp();
sleeping_task_after.WaitUntilDone();
test::SleepingBackgroundTask sleeping_task_after2;
db_->GetEnv()->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
&sleeping_task_after2, Env::Priority::HIGH);
sleeping_task_after2.WakeUp();
sleeping_task_after2.WaitUntilDone();
value = options.write_buffer_manager->memory_usage();
ASSERT_EQ(base_value, value);
}
TEST_F(DBTest2, SwitchMemtableRaceWithNewManifest) {
Options options = CurrentOptions();
DestroyAndReopen(options);
options.max_manifest_file_size = 10;
options.max_manifest_space_amp_pct = 0;
options.create_if_missing = true;
CreateAndReopenWithCF({"pikachu"}, options);
ASSERT_EQ(2, handles_.size());
ASSERT_OK(Put("foo", "value"));
const int kL0Files = options.level0_file_num_compaction_trigger;
for (int i = 0; i < kL0Files; ++i) {
ASSERT_OK(Put(1, "a", std::to_string(i)));
ASSERT_OK(Flush(1));
}
port::Thread thread([&]() { ASSERT_OK(Flush()); });
ASSERT_OK(dbfull()->TEST_WaitForCompact());
thread.join();
}
TEST_F(DBTest2, SameSmallestInSameLevel) {
Options options = CurrentOptions();
options.merge_operator = MergeOperators::CreateStringAppendOperator();
DestroyAndReopen(options);
ASSERT_OK(Put("key", "1"));
ASSERT_OK(Put("key", "2"));
ASSERT_OK(db_->Merge(WriteOptions(), "key", "3"));
ASSERT_OK(db_->Merge(WriteOptions(), "key", "4"));
ASSERT_OK(Flush());
CompactRangeOptions cro;
cro.change_level = true;
cro.target_level = 2;
ASSERT_OK(dbfull()->CompactRange(cro, db_->DefaultColumnFamily(), nullptr,
nullptr));
ASSERT_OK(db_->Merge(WriteOptions(), "key", "5"));
ASSERT_OK(Flush());
ASSERT_OK(db_->Merge(WriteOptions(), "key", "6"));
ASSERT_OK(Flush());
ASSERT_OK(db_->Merge(WriteOptions(), "key", "7"));
ASSERT_OK(Flush());
ASSERT_OK(db_->Merge(WriteOptions(), "key", "8"));
ASSERT_OK(Flush());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ("0,4,1", FilesPerLevel());
ASSERT_EQ("2,3,4,5,6,7,8", Get("key"));
}
TEST_F(DBTest2, FileConsistencyCheckInOpen) {
ASSERT_OK(Put("foo", "bar"));
ASSERT_OK(Flush());
SyncPoint::GetInstance()->SetCallBack(
"VersionBuilder::CheckConsistencyBeforeReturn", [&](void* arg) {
Status* ret_s = static_cast<Status*>(arg);
*ret_s = Status::Corruption("fcc");
});
SyncPoint::GetInstance()->EnableProcessing();
Options options = CurrentOptions();
options.force_consistency_checks = true;
ASSERT_NOK(TryReopen(options));
SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(DBTest2, BlockBasedTablePrefixIndexSeekForPrev) {
BlockBasedTableOptions table_options;
Options options = CurrentOptions();
table_options.block_size = 300;
table_options.index_type = BlockBasedTableOptions::kHashSearch;
table_options.index_shortening =
BlockBasedTableOptions::IndexShorteningMode::kNoShortening;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
options.prefix_extractor.reset(NewFixedPrefixTransform(1));
Reopen(options);
Random rnd(301);
std::string large_value = rnd.RandomString(500);
ASSERT_OK(Put("a1", large_value));
ASSERT_OK(Put("x1", large_value));
ASSERT_OK(Put("y1", large_value));
ASSERT_OK(Flush());
{
std::unique_ptr<Iterator> iterator(db_->NewIterator(ReadOptions()));
ASSERT_OK(iterator->status());
iterator->SeekForPrev("x3");
ASSERT_TRUE(iterator->Valid());
ASSERT_EQ("x1", iterator->key().ToString());
iterator->SeekForPrev("a3");
ASSERT_TRUE(iterator->Valid());
ASSERT_EQ("a1", iterator->key().ToString());
iterator->SeekForPrev("y3");
ASSERT_TRUE(iterator->Valid());
ASSERT_EQ("y1", iterator->key().ToString());
iterator->SeekForPrev("b1");
if (iterator->Valid()) {
ASSERT_EQ("a1", iterator->key().ToString());
}
iterator->SeekForPrev("c1");
if (iterator->Valid()) {
ASSERT_EQ("a1", iterator->key().ToString());
}
iterator->SeekForPrev("d1");
if (iterator->Valid()) {
ASSERT_EQ("a1", iterator->key().ToString());
}
iterator->SeekForPrev("y3");
ASSERT_TRUE(iterator->Valid());
ASSERT_EQ("y1", iterator->key().ToString());
}
}
TEST_F(DBTest2, PartitionedIndexPrefetchFailure) {
Options options = last_options_;
options.env = env_;
options.max_open_files = 20;
BlockBasedTableOptions bbto;
bbto.index_type = BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
bbto.metadata_block_size = 128;
bbto.block_size = 128;
bbto.block_cache = NewLRUCache(16777216);
bbto.cache_index_and_filter_blocks = true;
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
DestroyAndReopen(options);
dbfull()->TEST_table_cache()->SetCapacity(0);
bbto.block_cache->SetCapacity(0);
Random rnd(301);
for (int i = 0; i < 4096; i++) {
ASSERT_OK(Put(Key(i), rnd.RandomString(32)));
}
ASSERT_OK(Flush());
for (int i = 0; i < 300; i++) {
env_->num_reads_fails_ = 0;
env_->rand_reads_fail_odd_ = 8;
std::string value;
Status s = dbfull()->Get(ReadOptions(), Key(1), &value);
if (env_->num_reads_fails_ > 0) {
ASSERT_NOK(s);
} else {
ASSERT_OK(s);
}
}
env_->rand_reads_fail_odd_ = 0;
}
TEST_F(DBTest2, ChangePrefixExtractor) {
for (bool use_partitioned_filter : {true, false}) {
BlockBasedTableOptions table_options;
Options options = CurrentOptions();
options.prefix_seek_opt_in_only = false;
bool expect_filter_check = !use_partitioned_filter;
table_options.partition_filters = use_partitioned_filter;
if (use_partitioned_filter) {
table_options.index_type =
BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
}
table_options.filter_policy.reset(NewBloomFilterPolicy(10, false));
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
options.statistics = CreateDBStatistics();
options.prefix_extractor.reset(NewFixedPrefixTransform(2));
DestroyAndReopen(options);
Random rnd(301);
ASSERT_OK(Put("aa", ""));
ASSERT_OK(Put("xb", ""));
ASSERT_OK(Put("xx1", ""));
ASSERT_OK(Put("xz1", ""));
ASSERT_OK(Put("zz", ""));
ASSERT_OK(Flush());
options.prefix_extractor.reset(NewFixedPrefixTransform(1));
Reopen(options);
{
std::unique_ptr<Iterator> iterator(db_->NewIterator(ReadOptions()));
ASSERT_OK(iterator->status());
iterator->Seek("xa");
ASSERT_TRUE(iterator->Valid());
ASSERT_EQ("xb", iterator->key().ToString());
if (expect_filter_check) {
EXPECT_EQ(0, PopTicker(options, NON_LAST_LEVEL_SEEK_FILTER_MATCH));
}
iterator->Seek("xz");
ASSERT_TRUE(iterator->Valid());
ASSERT_EQ("xz1", iterator->key().ToString());
if (expect_filter_check) {
EXPECT_EQ(0, PopTicker(options, NON_LAST_LEVEL_SEEK_FILTER_MATCH));
}
}
std::string ub_str = "xg9";
Slice ub(ub_str);
ReadOptions ro;
ro.iterate_upper_bound = &ub;
{
std::unique_ptr<Iterator> iterator(db_->NewIterator(ro));
ASSERT_OK(iterator->status());
iterator->SeekForPrev("xg0");
ASSERT_TRUE(iterator->Valid());
ASSERT_EQ("xb", iterator->key().ToString());
if (expect_filter_check) {
EXPECT_EQ(0, PopTicker(options, NON_LAST_LEVEL_SEEK_FILTER_MATCH));
}
}
ub_str = "xx9";
ub = Slice(ub_str);
{
std::unique_ptr<Iterator> iterator(db_->NewIterator(ro));
ASSERT_OK(iterator->status());
iterator->Seek("x");
ASSERT_TRUE(iterator->Valid());
ASSERT_EQ("xb", iterator->key().ToString());
if (expect_filter_check) {
EXPECT_EQ(0, PopTicker(options, NON_LAST_LEVEL_SEEK_FILTER_MATCH));
}
iterator->Seek("xx0");
ASSERT_TRUE(iterator->Valid());
ASSERT_EQ("xx1", iterator->key().ToString());
if (expect_filter_check) {
EXPECT_EQ(1, PopTicker(options, NON_LAST_LEVEL_SEEK_FILTER_MATCH));
}
}
CompactRangeOptions compact_range_opts;
compact_range_opts.bottommost_level_compaction =
BottommostLevelCompaction::kForce;
ASSERT_OK(db_->CompactRange(compact_range_opts, nullptr, nullptr));
ASSERT_OK(db_->CompactRange(compact_range_opts, nullptr, nullptr));
{
std::unique_ptr<Iterator> iterator(db_->NewIterator(ReadOptions()));
iterator->Seek("x");
ASSERT_TRUE(iterator->Valid());
ASSERT_EQ("xb", iterator->key().ToString());
if (expect_filter_check) {
EXPECT_EQ(1, PopTicker(options, NON_LAST_LEVEL_SEEK_FILTER_MATCH));
}
iterator->Seek("xg");
ASSERT_TRUE(iterator->Valid());
ASSERT_EQ("xx1", iterator->key().ToString());
if (expect_filter_check) {
EXPECT_EQ(1, PopTicker(options, NON_LAST_LEVEL_SEEK_FILTER_MATCH));
}
iterator->Seek("xz");
ASSERT_TRUE(iterator->Valid());
ASSERT_EQ("xz1", iterator->key().ToString());
if (expect_filter_check) {
EXPECT_EQ(1, PopTicker(options, NON_LAST_LEVEL_SEEK_FILTER_MATCH));
}
ASSERT_OK(iterator->status());
}
{
std::unique_ptr<Iterator> iterator(db_->NewIterator(ro));
iterator->SeekForPrev("xx0");
ASSERT_TRUE(iterator->Valid());
ASSERT_EQ("xb", iterator->key().ToString());
if (expect_filter_check) {
EXPECT_EQ(1, PopTicker(options, NON_LAST_LEVEL_SEEK_FILTER_MATCH));
}
iterator->Seek("xx0");
ASSERT_TRUE(iterator->Valid());
ASSERT_EQ("xx1", iterator->key().ToString());
if (expect_filter_check) {
EXPECT_EQ(1, PopTicker(options, NON_LAST_LEVEL_SEEK_FILTER_MATCH));
}
ASSERT_OK(iterator->status());
}
ub_str = "xg9";
ub = Slice(ub_str);
{
std::unique_ptr<Iterator> iterator(db_->NewIterator(ro));
iterator->SeekForPrev("xg0");
ASSERT_TRUE(iterator->Valid());
ASSERT_EQ("xb", iterator->key().ToString());
if (expect_filter_check) {
EXPECT_EQ(1, PopTicker(options, NON_LAST_LEVEL_SEEK_FILTER_MATCH));
}
ASSERT_OK(iterator->status());
}
}
}
TEST_F(DBTest2, BlockBasedTablePrefixGetIndexNotFound) {
BlockBasedTableOptions table_options;
Options options = CurrentOptions();
table_options.block_size = 300;
table_options.index_type = BlockBasedTableOptions::kHashSearch;
table_options.index_shortening =
BlockBasedTableOptions::IndexShorteningMode::kNoShortening;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
options.prefix_extractor.reset(NewFixedPrefixTransform(1));
options.level0_file_num_compaction_trigger = 8;
Reopen(options);
ASSERT_OK(Put("b1", "ok"));
ASSERT_OK(Flush());
ASSERT_OK(Put("a1", ""));
ASSERT_OK(Put("c1", ""));
ASSERT_OK(Flush());
ASSERT_OK(Put("a2", ""));
ASSERT_OK(Put("c2", ""));
ASSERT_OK(Flush());
ASSERT_OK(Put("a3", ""));
ASSERT_OK(Put("c3", ""));
ASSERT_OK(Flush());
ASSERT_OK(Put("a4", ""));
ASSERT_OK(Put("c4", ""));
ASSERT_OK(Flush());
ASSERT_OK(Put("a5", ""));
ASSERT_OK(Put("c5", ""));
ASSERT_OK(Flush());
ASSERT_EQ("ok", Get("b1"));
}
TEST_F(DBTest2, AutoPrefixMode1) {
do {
Options options = CurrentOptions();
BlockBasedTableOptions table_options =
*options.table_factory->GetOptions<BlockBasedTableOptions>();
table_options.filter_policy.reset(NewBloomFilterPolicy(10, false));
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
options.prefix_extractor.reset(NewFixedPrefixTransform(1));
options.statistics = CreateDBStatistics();
Reopen(options);
Random rnd(301);
std::string large_value = rnd.RandomString(500);
ASSERT_OK(Put("a1", large_value));
ASSERT_OK(Put("x1", large_value));
ASSERT_OK(Put("y1", large_value));
ASSERT_OK(Flush());
ReadOptions ro;
ro.total_order_seek = false;
ro.auto_prefix_mode = true;
const auto hit_stat = options.num_levels == 1
? LAST_LEVEL_SEEK_FILTER_MATCH
: NON_LAST_LEVEL_SEEK_FILTER_MATCH;
const auto miss_stat = options.num_levels == 1
? LAST_LEVEL_SEEK_FILTERED
: NON_LAST_LEVEL_SEEK_FILTERED;
{
std::unique_ptr<Iterator> iterator(db_->NewIterator(ro));
iterator->Seek("b1");
ASSERT_TRUE(iterator->Valid());
ASSERT_EQ("x1", iterator->key().ToString());
EXPECT_EQ(0, TestGetAndResetTickerCount(options, hit_stat));
EXPECT_EQ(0, TestGetAndResetTickerCount(options, miss_stat));
ASSERT_OK(iterator->status());
}
Slice ub;
ro.iterate_upper_bound = &ub;
ub = "b9";
{
std::unique_ptr<Iterator> iterator(db_->NewIterator(ro));
iterator->Seek("b1");
ASSERT_FALSE(iterator->Valid());
EXPECT_EQ(0, TestGetAndResetTickerCount(options, hit_stat));
EXPECT_EQ(1, TestGetAndResetTickerCount(options, miss_stat));
ASSERT_OK(iterator->status());
}
ub = "z";
{
std::unique_ptr<Iterator> iterator(db_->NewIterator(ro));
iterator->Seek("b1");
ASSERT_TRUE(iterator->Valid());
ASSERT_EQ("x1", iterator->key().ToString());
EXPECT_EQ(0, TestGetAndResetTickerCount(options, hit_stat));
EXPECT_EQ(0, TestGetAndResetTickerCount(options, miss_stat));
ASSERT_OK(iterator->status());
}
ub = "c";
{
std::unique_ptr<Iterator> iterator(db_->NewIterator(ro));
iterator->Seek("b1");
ASSERT_FALSE(iterator->Valid());
EXPECT_EQ(0, TestGetAndResetTickerCount(options, hit_stat));
EXPECT_EQ(1, TestGetAndResetTickerCount(options, miss_stat));
ASSERT_OK(iterator->status());
}
ub = "c1";
{
std::unique_ptr<Iterator> iterator(db_->NewIterator(ro));
iterator->Seek("b1");
ASSERT_FALSE(iterator->Valid());
EXPECT_EQ(0, TestGetAndResetTickerCount(options, hit_stat));
EXPECT_EQ(0, TestGetAndResetTickerCount(options, miss_stat));
ASSERT_OK(iterator->status());
}
{
std::unique_ptr<Iterator> iterator(db_->NewIterator(ro));
ub = "b9";
iterator->Seek("b1");
ASSERT_FALSE(iterator->Valid());
EXPECT_EQ(0, TestGetAndResetTickerCount(options, hit_stat));
EXPECT_EQ(1, TestGetAndResetTickerCount(options, miss_stat));
ASSERT_OK(iterator->status());
ub = "z";
iterator->Seek("b1");
ASSERT_TRUE(iterator->Valid());
ASSERT_EQ("x1", iterator->key().ToString());
EXPECT_EQ(0, TestGetAndResetTickerCount(options, hit_stat));
EXPECT_EQ(0, TestGetAndResetTickerCount(options, miss_stat));
ub = "c";
iterator->Seek("b1");
ASSERT_FALSE(iterator->Valid());
EXPECT_EQ(0, TestGetAndResetTickerCount(options, hit_stat));
EXPECT_EQ(1, TestGetAndResetTickerCount(options, miss_stat));
ub = "b9";
iterator->SeekForPrev("b1");
ASSERT_TRUE(iterator->Valid());
ASSERT_EQ("a1", iterator->key().ToString());
EXPECT_EQ(0, TestGetAndResetTickerCount(options, hit_stat));
EXPECT_EQ(0, TestGetAndResetTickerCount(options, miss_stat));
ub = "zz";
iterator->SeekToLast();
ASSERT_TRUE(iterator->Valid());
ASSERT_EQ("y1", iterator->key().ToString());
iterator->SeekToFirst();
ASSERT_TRUE(iterator->Valid());
ASSERT_EQ("a1", iterator->key().ToString());
}
options.comparator = ReverseBytewiseComparator();
options.prefix_extractor.reset(NewFixedPrefixTransform(1));
DestroyAndReopen(options);
ASSERT_OK(Put("a1", large_value));
ASSERT_OK(Put("x1", large_value));
ASSERT_OK(Put("y1", large_value));
ASSERT_OK(Flush());
{
std::unique_ptr<Iterator> iterator(db_->NewIterator(ro));
ub = "b1";
iterator->Seek("b9");
ASSERT_FALSE(iterator->Valid());
EXPECT_EQ(0, TestGetAndResetTickerCount(options, hit_stat));
EXPECT_EQ(1, TestGetAndResetTickerCount(options, miss_stat));
ASSERT_OK(iterator->status());
ub = "b1";
iterator->Seek("z");
ASSERT_TRUE(iterator->Valid());
ASSERT_EQ("y1", iterator->key().ToString());
EXPECT_EQ(0, TestGetAndResetTickerCount(options, hit_stat));
EXPECT_EQ(0, TestGetAndResetTickerCount(options, miss_stat));
ub = "b1";
iterator->Seek("c");
ASSERT_FALSE(iterator->Valid());
EXPECT_EQ(0, TestGetAndResetTickerCount(options, hit_stat));
EXPECT_EQ(0, TestGetAndResetTickerCount(options, miss_stat));
ub = "b";
iterator->Seek("c9");
ASSERT_FALSE(iterator->Valid());
EXPECT_EQ(0, TestGetAndResetTickerCount(options, hit_stat));
EXPECT_EQ(0, TestGetAndResetTickerCount(options, miss_stat));
ub = "a";
iterator->Seek("b9");
ASSERT_TRUE(iterator->Valid());
ASSERT_EQ("a1", iterator->key().ToString());
EXPECT_EQ(0, TestGetAndResetTickerCount(options, hit_stat));
EXPECT_EQ(0, TestGetAndResetTickerCount(options, miss_stat));
ub = "b";
iterator->Seek("a");
ASSERT_FALSE(iterator->Valid());
EXPECT_EQ(0, TestGetAndResetTickerCount(options, hit_stat));
EXPECT_EQ(0, TestGetAndResetTickerCount(options, miss_stat));
ub = "b1";
iterator->SeekForPrev("b9");
ASSERT_TRUE(iterator->Valid());
ASSERT_EQ("x1", iterator->key().ToString());
EXPECT_EQ(0, TestGetAndResetTickerCount(options, hit_stat));
EXPECT_EQ(0, TestGetAndResetTickerCount(options, miss_stat));
ub = "a";
iterator->SeekToLast();
ASSERT_TRUE(iterator->Valid());
ASSERT_EQ("a1", iterator->key().ToString());
iterator->SeekToFirst();
ASSERT_TRUE(iterator->Valid());
ASSERT_EQ("y1", iterator->key().ToString());
}
options.comparator = BytewiseComparator();
for (const auto config : {"fixed:2", "capped:2"}) {
ASSERT_OK(SliceTransform::CreateFromString(ConfigOptions(), config,
&options.prefix_extractor));
if (StartsWith(config, "fixed") &&
(table_options.index_type == BlockBasedTableOptions::kHashSearch ||
StartsWith(options.memtable_factory->Name(), "Hash"))) {
continue;
}
DestroyAndReopen(options);
const char* a_end_stuff = "a\xffXYZ";
const char* b_begin_stuff = "b\x00XYZ";
ASSERT_OK(Put("a", large_value));
ASSERT_OK(Put("b", large_value));
ASSERT_OK(Put(Slice(b_begin_stuff, 3), large_value));
ASSERT_OK(Put("c", large_value));
ASSERT_OK(Flush());
ub = Slice(a_end_stuff, 4);
ro.iterate_upper_bound = &ub;
std::unique_ptr<Iterator> iterator(db_->NewIterator(ro));
iterator->Seek(Slice(a_end_stuff, 2));
ASSERT_FALSE(iterator->Valid());
EXPECT_EQ(0, TestGetAndResetTickerCount(options, hit_stat));
EXPECT_EQ(1, TestGetAndResetTickerCount(options, miss_stat));
ASSERT_OK(iterator->status());
ub = Slice(b_begin_stuff, 2);
ro.iterate_upper_bound = &ub;
iterator->Seek(Slice(a_end_stuff, 2));
ASSERT_FALSE(iterator->Valid());
EXPECT_EQ(0, TestGetAndResetTickerCount(options, hit_stat));
EXPECT_EQ(1, TestGetAndResetTickerCount(options, miss_stat));
ASSERT_OK(iterator->status());
ReadOptions tos_ro = ro;
tos_ro.total_order_seek = true;
tos_ro.auto_prefix_mode = false;
iterator.reset(db_->NewIterator(tos_ro));
iterator->Seek(Slice(a_end_stuff, 2));
ASSERT_TRUE(iterator->Valid());
ASSERT_EQ("b", iterator->key().ToString());
EXPECT_EQ(0, TestGetAndResetTickerCount(options, hit_stat));
EXPECT_EQ(0, TestGetAndResetTickerCount(options, miss_stat));
ASSERT_OK(iterator->status());
}
} while (ChangeOptions(kSkipPlainTable));
}
class RenameCurrentTest : public DBTestBase,
public testing::WithParamInterface<std::string> {
public:
RenameCurrentTest()
: DBTestBase("rename_current_test", true),
sync_point_(GetParam()) {}
~RenameCurrentTest() override = default;
void SetUp() override {
env_->no_file_overwrite_.store(true, std::memory_order_release);
}
void TearDown() override {
env_->no_file_overwrite_.store(false, std::memory_order_release);
}
void SetupSyncPoints() {
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->SetCallBack(sync_point_, [&](void* arg) {
Status* s = static_cast<Status*>(arg);
assert(s);
*s = Status::IOError("Injected IO error.");
});
}
const std::string sync_point_;
};
INSTANTIATE_TEST_CASE_P(DistributedFS, RenameCurrentTest,
::testing::Values("SetCurrentFile:BeforeRename",
"SetCurrentFile:AfterRename"));
TEST_P(RenameCurrentTest, Open) {
Destroy(last_options_);
Options options = GetDefaultOptions();
options.create_if_missing = true;
SetupSyncPoints();
SyncPoint::GetInstance()->EnableProcessing();
Status s = TryReopen(options);
ASSERT_NOK(s);
SyncPoint::GetInstance()->DisableProcessing();
Reopen(options);
}
TEST_P(RenameCurrentTest, Flush) {
Destroy(last_options_);
Options options = GetDefaultOptions();
options.max_manifest_file_size = 1;
options.max_manifest_space_amp_pct = 0;
options.create_if_missing = true;
Reopen(options);
ASSERT_OK(Put("key", "value"));
SetupSyncPoints();
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_NOK(Flush());
ASSERT_NOK(Put("foo", "value"));
SyncPoint::GetInstance()->DisableProcessing();
Reopen(options);
ASSERT_EQ("value", Get("key"));
ASSERT_EQ("NOT_FOUND", Get("foo"));
}
TEST_P(RenameCurrentTest, Compaction) {
Destroy(last_options_);
Options options = GetDefaultOptions();
options.max_manifest_file_size = 1;
options.max_manifest_space_amp_pct = 0;
options.create_if_missing = true;
Reopen(options);
ASSERT_OK(Put("a", "a_value"));
ASSERT_OK(Put("c", "c_value"));
ASSERT_OK(Flush());
ASSERT_OK(Put("b", "b_value"));
ASSERT_OK(Put("d", "d_value"));
ASSERT_OK(Flush());
SetupSyncPoints();
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_NOK(db_->CompactRange(CompactRangeOptions(), nullptr,
nullptr));
ASSERT_NOK(Put("foo", "value"));
SyncPoint::GetInstance()->DisableProcessing();
Reopen(options);
ASSERT_EQ("NOT_FOUND", Get("foo"));
ASSERT_EQ("d_value", Get("d"));
}
TEST_F(DBTest2, VariousFileTemperatures) {
constexpr size_t kNumberFileTypes = static_cast<size_t>(kBlobFile) + 1U;
struct MyTestFS : public FileTemperatureTestFS {
explicit MyTestFS(const std::shared_ptr<FileSystem>& fs)
: FileTemperatureTestFS(fs) {
Reset();
}
IOStatus NewWritableFile(const std::string& fname, const FileOptions& opts,
std::unique_ptr<FSWritableFile>* result,
IODebugContext* dbg) override {
IOStatus ios =
FileTemperatureTestFS::NewWritableFile(fname, opts, result, dbg);
if (ios.ok()) {
uint64_t number;
FileType type;
if (ParseFileName(GetFileName(fname), &number, "LOG", &type)) {
if (type == kTableFile) {
} else if (type == kWalFile) {
if (opts.temperature != expected_wal_temperature) {
std::cerr << "Attempt to open " << fname << " with temperature "
<< temperature_to_string[opts.temperature]
<< " rather than "
<< temperature_to_string[expected_wal_temperature]
<< std::endl;
assert(false);
}
} else if (type == kDescriptorFile) {
if (opts.temperature != expected_manifest_temperature) {
std::cerr << "Attempt to open " << fname << " with temperature "
<< temperature_to_string[opts.temperature]
<< " rather than "
<< temperature_to_string[expected_wal_temperature]
<< std::endl;
assert(false);
}
} else if (opts.temperature != expected_other_metadata_temperature) {
std::cerr << "Attempt to open " << fname << " with temperature "
<< temperature_to_string[opts.temperature]
<< " rather than "
<< temperature_to_string[expected_wal_temperature]
<< std::endl;
assert(false);
}
UpdateCount(type, 1);
}
}
return ios;
}
IOStatus RenameFile(const std::string& src, const std::string& dst,
const IOOptions& options,
IODebugContext* dbg) override {
IOStatus ios = FileTemperatureTestFS::RenameFile(src, dst, options, dbg);
if (ios.ok()) {
uint64_t number;
FileType src_type;
FileType dst_type;
assert(ParseFileName(GetFileName(src), &number, "LOG", &src_type));
assert(ParseFileName(GetFileName(dst), &number, "LOG", &dst_type));
UpdateCount(src_type, -1);
UpdateCount(dst_type, 1);
}
return ios;
}
void UpdateCount(FileType type, int delta) {
size_t i = static_cast<size_t>(type);
assert(i < kNumberFileTypes);
counts[i].FetchAddRelaxed(delta);
}
std::map<FileType, size_t> PopCounts() {
std::map<FileType, size_t> ret;
for (size_t i = 0; i < kNumberFileTypes; ++i) {
int c = counts[i].ExchangeRelaxed(0);
if (c > 0) {
ret[static_cast<FileType>(i)] = c;
}
}
return ret;
}
FileOptions OptimizeForLogWrite(
const FileOptions& file_options,
const DBOptions& ) const override {
FileOptions opts = file_options;
if (optimize_wal_temperature != Temperature::kUnknown) {
opts.temperature = optimize_wal_temperature;
}
return opts;
}
FileOptions OptimizeForManifestWrite(
const FileOptions& file_options) const override {
FileOptions opts = file_options;
if (optimize_manifest_temperature != Temperature::kUnknown) {
opts.temperature = optimize_manifest_temperature;
}
return opts;
}
void Reset() {
optimize_manifest_temperature = Temperature::kUnknown;
optimize_wal_temperature = Temperature::kUnknown;
expected_manifest_temperature = Temperature::kUnknown;
expected_other_metadata_temperature = Temperature::kUnknown;
expected_wal_temperature = Temperature::kUnknown;
for (auto& c : counts) {
c.StoreRelaxed(0);
}
}
Temperature optimize_manifest_temperature;
Temperature optimize_wal_temperature;
Temperature expected_manifest_temperature;
Temperature expected_other_metadata_temperature;
Temperature expected_wal_temperature;
std::array<RelaxedAtomic<int>, kNumberFileTypes> counts;
};
auto test_fs = std::make_shared<MyTestFS>(env_->GetFileSystem());
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, test_fs));
for (bool use_optimize : {false, true}) {
std::cerr << "use_optimize: " << std::to_string(use_optimize) << std::endl;
for (bool use_temp_options : {false, true}) {
std::cerr << "use_temp_options: " << std::to_string(use_temp_options)
<< std::endl;
Options options = CurrentOptions();
options.compaction_style = kCompactionStyleUniversal;
options.env = env.get();
test_fs->Reset();
if (use_optimize) {
test_fs->optimize_manifest_temperature = RandomKnownTemperature();
test_fs->expected_manifest_temperature =
test_fs->optimize_manifest_temperature;
test_fs->optimize_wal_temperature = RandomKnownTemperature();
test_fs->expected_wal_temperature = test_fs->optimize_wal_temperature;
}
if (use_temp_options) {
options.metadata_write_temperature = RandomKnownTemperature();
test_fs->expected_manifest_temperature =
options.metadata_write_temperature;
test_fs->expected_other_metadata_temperature =
options.metadata_write_temperature;
options.wal_write_temperature = RandomKnownTemperature();
test_fs->expected_wal_temperature = options.wal_write_temperature;
options.last_level_temperature = RandomKnownTemperature();
options.default_write_temperature = RandomKnownTemperature();
}
DestroyAndReopen(options);
Defer closer([&] { Close(); });
using FTC = std::map<FileType, size_t>;
ASSERT_EQ(test_fs->PopCounts(), FTC({{kWalFile, 1},
{kDescriptorFile, 2},
{kCurrentFile, 2},
{kIdentityFile, 1},
{kOptionsFile, 1}}));
using TCM = std::map<Temperature, size_t>;
ASSERT_EQ(test_fs->CountCurrentSstFilesByTemp(), TCM({}));
ASSERT_OK(Put("foo", "1"));
ASSERT_OK(Put("bar", "1"));
ASSERT_OK(Flush());
ASSERT_OK(Put("foo", "2"));
ASSERT_OK(Put("bar", "2"));
ASSERT_OK(Flush());
ASSERT_EQ(test_fs->CountCurrentSstFilesByTemp(),
TCM({{options.default_write_temperature, 2}}));
ASSERT_OK(db_->CompactRange({}, nullptr, nullptr));
ASSERT_OK(dbfull()->TEST_WaitForBackgroundWork());
ASSERT_EQ(test_fs->CountCurrentSstFilesByTemp(),
TCM({{options.last_level_temperature, 1}}));
ASSERT_OK(Put("foo", "3"));
ASSERT_OK(Put("bar", "3"));
ASSERT_OK(Flush());
ASSERT_OK(Put("dog", "3"));
{
TCM expected;
expected[options.default_write_temperature] += 1;
expected[options.last_level_temperature] += 1;
ASSERT_EQ(test_fs->CountCurrentSstFilesByTemp(), expected);
}
ASSERT_EQ(test_fs->PopCounts(), FTC({{kWalFile, 3}, {kTableFile, 4}}));
Reopen(options);
ASSERT_EQ(test_fs->PopCounts(), FTC({{kWalFile, 1},
{kTableFile, 1},
{kDescriptorFile, 1},
{kCurrentFile, 1},
{kOptionsFile, 1}}));
Destroy(options);
}
}
}
TEST_F(DBTest2, LastLevelTemperature) {
class TestListener : public EventListener {
public:
void OnFileReadFinish(const FileOperationInfo& info) override {
UpdateFileTemperature(info);
}
void OnFileWriteFinish(const FileOperationInfo& info) override {
UpdateFileTemperature(info);
}
void OnFileFlushFinish(const FileOperationInfo& info) override {
UpdateFileTemperature(info);
}
void OnFileSyncFinish(const FileOperationInfo& info) override {
UpdateFileTemperature(info);
}
void OnFileCloseFinish(const FileOperationInfo& info) override {
UpdateFileTemperature(info);
}
bool ShouldBeNotifiedOnFileIO() override { return true; }
std::unordered_map<uint64_t, Temperature> file_temperatures;
private:
void UpdateFileTemperature(const FileOperationInfo& info) {
auto filename = GetFileName(info.path);
uint64_t number;
FileType type;
ASSERT_TRUE(ParseFileName(filename, &number, &type));
if (type == kTableFile) {
MutexLock l(&mutex_);
auto ret = file_temperatures.insert({number, info.temperature});
if (!ret.second) {
ASSERT_TRUE(ret.first->second == info.temperature);
}
}
}
std::string GetFileName(const std::string& fname) {
auto filename = fname.substr(fname.find_last_of(kFilePathSeparator) + 1);
filename = filename.substr(filename.find_last_of('/') + 1);
return filename;
}
port::Mutex mutex_;
};
const int kNumLevels = 7;
const int kLastLevel = kNumLevels - 1;
auto* listener = new TestListener();
Options options = CurrentOptions();
options.last_level_temperature = Temperature::kWarm;
options.level0_file_num_compaction_trigger = 2;
options.level_compaction_dynamic_level_bytes = true;
options.num_levels = kNumLevels;
options.statistics = CreateDBStatistics();
options.listeners.emplace_back(listener);
Reopen(options);
auto size = GetSstSizeHelper(Temperature::kUnknown);
ASSERT_EQ(size, 0);
size = GetSstSizeHelper(Temperature::kWarm);
ASSERT_EQ(size, 0);
size = GetSstSizeHelper(Temperature::kHot);
ASSERT_EQ(size, 0);
ASSERT_OK(Put("foo", "bar"));
ASSERT_OK(Put("bar", "bar"));
ASSERT_OK(Flush());
ASSERT_OK(Put("foo", "bar"));
ASSERT_OK(Put("bar", "bar"));
ASSERT_OK(Flush());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
get_iostats_context()->Reset();
IOStatsContext* iostats = get_iostats_context();
ColumnFamilyMetaData metadata;
db_->GetColumnFamilyMetaData(&metadata);
ASSERT_EQ(1, metadata.file_count);
SstFileMetaData meta = metadata.levels[kLastLevel].files[0];
ASSERT_EQ(Temperature::kWarm, meta.temperature);
uint64_t number;
FileType type;
ASSERT_TRUE(ParseFileName(meta.name, &number, &type));
ASSERT_EQ(listener->file_temperatures.at(number), meta.temperature);
size = GetSstSizeHelper(Temperature::kUnknown);
ASSERT_EQ(size, 0);
size = GetSstSizeHelper(Temperature::kWarm);
ASSERT_GT(size, 0);
ASSERT_EQ(iostats->file_io_stats_by_temperature.hot_file_read_count, 0);
ASSERT_EQ(iostats->file_io_stats_by_temperature.warm_file_read_count, 0);
ASSERT_EQ(iostats->file_io_stats_by_temperature.hot_file_read_count, 0);
ASSERT_EQ(options.statistics->getTickerCount(HOT_FILE_READ_BYTES), 0);
ASSERT_GT(options.statistics->getTickerCount(WARM_FILE_READ_BYTES), 0);
ASSERT_EQ(options.statistics->getTickerCount(COLD_FILE_READ_BYTES), 0);
ASSERT_EQ("bar", Get("foo"));
ASSERT_EQ(iostats->file_io_stats_by_temperature.hot_file_read_count, 0);
ASSERT_EQ(iostats->file_io_stats_by_temperature.warm_file_read_count, 1);
ASSERT_EQ(iostats->file_io_stats_by_temperature.hot_file_read_count, 0);
ASSERT_EQ(iostats->file_io_stats_by_temperature.hot_file_bytes_read, 0);
ASSERT_GT(iostats->file_io_stats_by_temperature.warm_file_bytes_read, 0);
ASSERT_EQ(iostats->file_io_stats_by_temperature.cold_file_bytes_read, 0);
ASSERT_EQ(options.statistics->getTickerCount(HOT_FILE_READ_BYTES), 0);
ASSERT_GT(options.statistics->getTickerCount(WARM_FILE_READ_BYTES), 0);
ASSERT_EQ(options.statistics->getTickerCount(COLD_FILE_READ_BYTES), 0);
ASSERT_EQ(options.statistics->getTickerCount(HOT_FILE_READ_COUNT), 0);
ASSERT_GT(options.statistics->getTickerCount(WARM_FILE_READ_COUNT), 0);
ASSERT_EQ(options.statistics->getTickerCount(COLD_FILE_READ_COUNT), 0);
ASSERT_OK(Put("foo", "bar"));
ASSERT_OK(Put("bar", "bar"));
ASSERT_OK(Flush());
ASSERT_EQ("bar", Get("bar"));
ASSERT_EQ(iostats->file_io_stats_by_temperature.hot_file_read_count, 0);
ASSERT_EQ(iostats->file_io_stats_by_temperature.warm_file_read_count, 1);
ASSERT_EQ(iostats->file_io_stats_by_temperature.hot_file_read_count, 0);
ASSERT_EQ(iostats->file_io_stats_by_temperature.hot_file_bytes_read, 0);
ASSERT_GT(iostats->file_io_stats_by_temperature.warm_file_bytes_read, 0);
ASSERT_EQ(iostats->file_io_stats_by_temperature.cold_file_bytes_read, 0);
ASSERT_EQ(options.statistics->getTickerCount(HOT_FILE_READ_BYTES), 0);
ASSERT_GT(options.statistics->getTickerCount(WARM_FILE_READ_BYTES), 0);
ASSERT_EQ(options.statistics->getTickerCount(COLD_FILE_READ_BYTES), 0);
ASSERT_EQ(options.statistics->getTickerCount(HOT_FILE_READ_COUNT), 0);
ASSERT_GT(options.statistics->getTickerCount(WARM_FILE_READ_COUNT), 0);
ASSERT_EQ(options.statistics->getTickerCount(COLD_FILE_READ_COUNT), 0);
db_->GetColumnFamilyMetaData(&metadata);
ASSERT_EQ(2, metadata.file_count);
meta = metadata.levels[0].files[0];
ASSERT_EQ(Temperature::kUnknown, meta.temperature);
ASSERT_TRUE(ParseFileName(meta.name, &number, &type));
ASSERT_EQ(listener->file_temperatures.at(number), meta.temperature);
meta = metadata.levels[kLastLevel].files[0];
ASSERT_EQ(Temperature::kWarm, meta.temperature);
ASSERT_TRUE(ParseFileName(meta.name, &number, &type));
ASSERT_EQ(listener->file_temperatures.at(number), meta.temperature);
size = GetSstSizeHelper(Temperature::kUnknown);
ASSERT_GT(size, 0);
size = GetSstSizeHelper(Temperature::kWarm);
ASSERT_GT(size, 0);
Reopen(options);
db_->GetColumnFamilyMetaData(&metadata);
ASSERT_EQ(2, metadata.file_count);
meta = metadata.levels[0].files[0];
ASSERT_EQ(Temperature::kUnknown, meta.temperature);
ASSERT_TRUE(ParseFileName(meta.name, &number, &type));
ASSERT_EQ(listener->file_temperatures.at(number), meta.temperature);
meta = metadata.levels[kLastLevel].files[0];
ASSERT_EQ(Temperature::kWarm, meta.temperature);
ASSERT_TRUE(ParseFileName(meta.name, &number, &type));
ASSERT_EQ(listener->file_temperatures.at(number), meta.temperature);
size = GetSstSizeHelper(Temperature::kUnknown);
ASSERT_GT(size, 0);
size = GetSstSizeHelper(Temperature::kWarm);
ASSERT_GT(size, 0);
size = GetSstSizeHelper(Temperature::kHot);
ASSERT_EQ(size, 0);
size = GetSstSizeHelper(Temperature::kCold);
ASSERT_EQ(size, 0);
std::string prop;
ASSERT_TRUE(dbfull()->GetProperty(
DB::Properties::kLiveSstFilesSizeAtTemperature + std::to_string(22),
&prop));
ASSERT_EQ(std::atoi(prop.c_str()), 0);
Reopen(options);
db_->GetColumnFamilyMetaData(&metadata);
ASSERT_EQ(2, metadata.file_count);
meta = metadata.levels[0].files[0];
ASSERT_EQ(Temperature::kUnknown, meta.temperature);
ASSERT_TRUE(ParseFileName(meta.name, &number, &type));
ASSERT_EQ(listener->file_temperatures.at(number), meta.temperature);
meta = metadata.levels[kLastLevel].files[0];
ASSERT_EQ(Temperature::kWarm, meta.temperature);
ASSERT_TRUE(ParseFileName(meta.name, &number, &type));
ASSERT_EQ(listener->file_temperatures.at(number), meta.temperature);
}
TEST_F(DBTest2, LastLevelTemperatureUniversal) {
const int kTriggerNum = 3;
const int kNumLevels = 5;
const int kBottommostLevel = kNumLevels - 1;
Options options = CurrentOptions();
options.compaction_style = kCompactionStyleUniversal;
options.level0_file_num_compaction_trigger = kTriggerNum;
options.num_levels = kNumLevels;
options.statistics = CreateDBStatistics();
DestroyAndReopen(options);
auto size = GetSstSizeHelper(Temperature::kUnknown);
ASSERT_EQ(size, 0);
size = GetSstSizeHelper(Temperature::kWarm);
ASSERT_EQ(size, 0);
size = GetSstSizeHelper(Temperature::kHot);
ASSERT_EQ(size, 0);
get_iostats_context()->Reset();
IOStatsContext* iostats = get_iostats_context();
for (int i = 0; i < kTriggerNum; i++) {
ASSERT_OK(Put("foo", "bar"));
ASSERT_OK(Put("bar", "bar"));
ASSERT_OK(Flush());
}
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ColumnFamilyMetaData metadata;
db_->GetColumnFamilyMetaData(&metadata);
ASSERT_EQ(1, metadata.file_count);
ASSERT_EQ(Temperature::kUnknown,
metadata.levels[kBottommostLevel].files[0].temperature);
size = GetSstSizeHelper(Temperature::kUnknown);
ASSERT_GT(size, 0);
size = GetSstSizeHelper(Temperature::kWarm);
ASSERT_EQ(size, 0);
ASSERT_EQ(iostats->file_io_stats_by_temperature.hot_file_read_count, 0);
ASSERT_EQ(iostats->file_io_stats_by_temperature.warm_file_read_count, 0);
ASSERT_EQ(iostats->file_io_stats_by_temperature.cold_file_read_count, 0);
ASSERT_EQ(options.statistics->getTickerCount(HOT_FILE_READ_BYTES), 0);
ASSERT_EQ(options.statistics->getTickerCount(WARM_FILE_READ_BYTES), 0);
ASSERT_EQ(options.statistics->getTickerCount(COLD_FILE_READ_BYTES), 0);
ASSERT_EQ(options.statistics->getTickerCount(HOT_FILE_READ_COUNT), 0);
ASSERT_EQ(options.statistics->getTickerCount(WARM_FILE_READ_COUNT), 0);
ASSERT_EQ(options.statistics->getTickerCount(COLD_FILE_READ_COUNT), 0);
ASSERT_EQ("bar", Get("foo"));
ASSERT_EQ(iostats->file_io_stats_by_temperature.hot_file_read_count, 0);
ASSERT_EQ(iostats->file_io_stats_by_temperature.warm_file_read_count, 0);
ASSERT_EQ(iostats->file_io_stats_by_temperature.hot_file_read_count, 0);
ASSERT_EQ(iostats->file_io_stats_by_temperature.hot_file_bytes_read, 0);
ASSERT_EQ(iostats->file_io_stats_by_temperature.warm_file_bytes_read, 0);
ASSERT_EQ(iostats->file_io_stats_by_temperature.cold_file_bytes_read, 0);
ASSERT_EQ(options.statistics->getTickerCount(HOT_FILE_READ_BYTES), 0);
ASSERT_EQ(options.statistics->getTickerCount(WARM_FILE_READ_BYTES), 0);
ASSERT_EQ(options.statistics->getTickerCount(COLD_FILE_READ_BYTES), 0);
ASSERT_EQ(options.statistics->getTickerCount(HOT_FILE_READ_COUNT), 0);
ASSERT_EQ(options.statistics->getTickerCount(WARM_FILE_READ_COUNT), 0);
ASSERT_EQ(options.statistics->getTickerCount(COLD_FILE_READ_COUNT), 0);
ASSERT_OK(Put("foo", "bar"));
ASSERT_OK(Put("bar", "bar"));
ASSERT_OK(Flush());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
db_->GetColumnFamilyMetaData(&metadata);
ASSERT_EQ(2, metadata.file_count);
ASSERT_EQ(Temperature::kUnknown, metadata.levels[0].files[0].temperature);
size = GetSstSizeHelper(Temperature::kUnknown);
ASSERT_GT(size, 0);
size = GetSstSizeHelper(Temperature::kWarm);
ASSERT_EQ(size, 0);
options.last_level_temperature = Temperature::kWarm;
Reopen(options);
db_->GetColumnFamilyMetaData(&metadata);
ASSERT_EQ(Temperature::kUnknown,
metadata.levels[kBottommostLevel].files[0].temperature);
size = GetSstSizeHelper(Temperature::kUnknown);
ASSERT_GT(size, 0);
size = GetSstSizeHelper(Temperature::kWarm);
ASSERT_EQ(size, 0);
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
db_->GetColumnFamilyMetaData(&metadata);
ASSERT_EQ(1, metadata.file_count);
ASSERT_EQ(Temperature::kWarm,
metadata.levels[kBottommostLevel].files[0].temperature);
size = GetSstSizeHelper(Temperature::kUnknown);
ASSERT_EQ(size, 0);
size = GetSstSizeHelper(Temperature::kWarm);
ASSERT_GT(size, 0);
ASSERT_EQ(options.statistics->getTickerCount(HOT_FILE_READ_BYTES), 0);
ASSERT_GT(options.statistics->getTickerCount(WARM_FILE_READ_BYTES), 0);
ASSERT_EQ(options.statistics->getTickerCount(COLD_FILE_READ_BYTES), 0);
ASSERT_EQ(options.statistics->getTickerCount(HOT_FILE_READ_COUNT), 0);
ASSERT_GT(options.statistics->getTickerCount(WARM_FILE_READ_COUNT), 0);
ASSERT_EQ(options.statistics->getTickerCount(COLD_FILE_READ_COUNT), 0);
ASSERT_OK(Put("foo", "bar"));
ASSERT_OK(Put("bar", "bar"));
ASSERT_OK(Flush());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
db_->GetColumnFamilyMetaData(&metadata);
ASSERT_EQ(2, metadata.file_count);
ASSERT_EQ(Temperature::kUnknown, metadata.levels[0].files[0].temperature);
size = GetSstSizeHelper(Temperature::kUnknown);
ASSERT_GT(size, 0);
size = GetSstSizeHelper(Temperature::kWarm);
ASSERT_GT(size, 0);
size = GetSstSizeHelper(Temperature::kHot);
ASSERT_EQ(size, 0);
size = GetSstSizeHelper(Temperature::kCold);
ASSERT_EQ(size, 0);
std::string prop;
ASSERT_TRUE(dbfull()->GetProperty(
DB::Properties::kLiveSstFilesSizeAtTemperature + std::to_string(22),
&prop));
ASSERT_EQ(std::atoi(prop.c_str()), 0);
auto s = db_->SetOptions({{"last_level_temperature", "kCold"}});
ASSERT_OK(s);
ASSERT_EQ(db_->GetOptions().last_level_temperature, Temperature::kCold);
db_->GetColumnFamilyMetaData(&metadata);
ASSERT_EQ(Temperature::kWarm,
metadata.levels[kBottommostLevel].files[0].temperature);
size = GetSstSizeHelper(Temperature::kUnknown);
ASSERT_GT(size, 0);
size = GetSstSizeHelper(Temperature::kWarm);
ASSERT_GT(size, 0);
size = GetSstSizeHelper(Temperature::kCold);
ASSERT_EQ(size, 0);
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
db_->GetColumnFamilyMetaData(&metadata);
ASSERT_EQ(1, metadata.file_count);
ASSERT_EQ(Temperature::kCold,
metadata.levels[kBottommostLevel].files[0].temperature);
size = GetSstSizeHelper(Temperature::kUnknown);
ASSERT_EQ(size, 0);
size = GetSstSizeHelper(Temperature::kWarm);
ASSERT_EQ(size, 0);
size = GetSstSizeHelper(Temperature::kCold);
ASSERT_GT(size, 0);
options.last_level_temperature = Temperature::kLastTemperature;
s = TryReopen(options);
ASSERT_TRUE(s.IsIOError());
}
TEST_F(DBTest2, LastLevelStatistics) {
for (bool write_time_default : {false, true}) {
SCOPED_TRACE("write time default? " + std::to_string(write_time_default));
Options options = CurrentOptions();
options.last_level_temperature = Temperature::kWarm;
if (write_time_default) {
options.default_write_temperature = Temperature::kHot;
ASSERT_EQ(options.default_temperature, Temperature::kUnknown);
} else {
options.default_temperature = Temperature::kHot;
ASSERT_EQ(options.default_write_temperature, Temperature::kUnknown);
}
options.level0_file_num_compaction_trigger = 2;
options.level_compaction_dynamic_level_bytes = true;
options.statistics = CreateDBStatistics();
BlockBasedTableOptions bbto;
bbto.no_block_cache = true;
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
DestroyAndReopen(options);
get_iostats_context()->Reset();
IOStatsContext* iostats = get_iostats_context();
ASSERT_OK(Put("foo1", "bar"));
ASSERT_OK(Put("bar", "bar"));
ASSERT_OK(Flush());
ASSERT_EQ("bar", Get("bar"));
ASSERT_GT(options.statistics->getTickerCount(NON_LAST_LEVEL_READ_BYTES), 0);
ASSERT_GT(options.statistics->getTickerCount(NON_LAST_LEVEL_READ_COUNT), 0);
ASSERT_EQ(options.statistics->getTickerCount(NON_LAST_LEVEL_READ_BYTES),
options.statistics->getTickerCount(HOT_FILE_READ_BYTES));
ASSERT_EQ(options.statistics->getTickerCount(NON_LAST_LEVEL_READ_COUNT),
options.statistics->getTickerCount(HOT_FILE_READ_COUNT));
ASSERT_EQ(options.statistics->getTickerCount(LAST_LEVEL_READ_BYTES), 0);
ASSERT_EQ(options.statistics->getTickerCount(LAST_LEVEL_READ_COUNT), 0);
ASSERT_OK(Put("foo2", "bar"));
ASSERT_OK(Put("bar", "bar"));
ASSERT_OK(Flush());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ("bar", Get("bar"));
ASSERT_EQ(options.statistics->getTickerCount(NON_LAST_LEVEL_READ_BYTES),
options.statistics->getTickerCount(HOT_FILE_READ_BYTES));
ASSERT_EQ(options.statistics->getTickerCount(NON_LAST_LEVEL_READ_COUNT),
options.statistics->getTickerCount(HOT_FILE_READ_COUNT));
ASSERT_EQ(options.statistics->getTickerCount(LAST_LEVEL_READ_BYTES),
options.statistics->getTickerCount(WARM_FILE_READ_BYTES));
ASSERT_EQ(options.statistics->getTickerCount(LAST_LEVEL_READ_COUNT),
options.statistics->getTickerCount(WARM_FILE_READ_COUNT));
auto pre_bytes =
options.statistics->getTickerCount(NON_LAST_LEVEL_READ_BYTES);
auto pre_count =
options.statistics->getTickerCount(NON_LAST_LEVEL_READ_COUNT);
ASSERT_OK(Put("foo3", "bar"));
ASSERT_OK(Put("bar", "bar"));
ASSERT_OK(Flush());
ASSERT_EQ("bar", Get("foo1"));
ASSERT_EQ("bar", Get("foo2"));
ASSERT_EQ("bar", Get("foo3"));
ASSERT_EQ("bar", Get("bar"));
ASSERT_GT(options.statistics->getTickerCount(NON_LAST_LEVEL_READ_BYTES),
pre_bytes);
ASSERT_GT(options.statistics->getTickerCount(NON_LAST_LEVEL_READ_COUNT),
pre_count);
ASSERT_EQ(options.statistics->getTickerCount(NON_LAST_LEVEL_READ_BYTES),
options.statistics->getTickerCount(HOT_FILE_READ_BYTES));
ASSERT_EQ(options.statistics->getTickerCount(NON_LAST_LEVEL_READ_COUNT),
options.statistics->getTickerCount(HOT_FILE_READ_COUNT));
ASSERT_EQ(options.statistics->getTickerCount(LAST_LEVEL_READ_BYTES),
options.statistics->getTickerCount(WARM_FILE_READ_BYTES));
ASSERT_EQ(options.statistics->getTickerCount(LAST_LEVEL_READ_COUNT),
options.statistics->getTickerCount(WARM_FILE_READ_COUNT));
ASSERT_NE(options.statistics->getTickerCount(LAST_LEVEL_READ_COUNT),
options.statistics->getTickerCount(NON_LAST_LEVEL_READ_COUNT));
options.default_temperature = Temperature::kCold;
ASSERT_OK(options.statistics->Reset());
Reopen(options);
ASSERT_EQ("bar", Get("foo1"));
ASSERT_EQ("bar", Get("foo2"));
ASSERT_EQ("bar", Get("foo3"));
ASSERT_EQ("bar", Get("bar"));
if (write_time_default) {
ASSERT_EQ(options.statistics->getTickerCount(NON_LAST_LEVEL_READ_BYTES),
options.statistics->getTickerCount(HOT_FILE_READ_BYTES));
ASSERT_EQ(options.statistics->getTickerCount(NON_LAST_LEVEL_READ_COUNT),
options.statistics->getTickerCount(HOT_FILE_READ_COUNT));
ASSERT_LT(0, options.statistics->getTickerCount(HOT_FILE_READ_BYTES));
ASSERT_EQ(0, options.statistics->getTickerCount(COLD_FILE_READ_BYTES));
} else {
ASSERT_EQ(options.statistics->getTickerCount(NON_LAST_LEVEL_READ_BYTES),
options.statistics->getTickerCount(COLD_FILE_READ_BYTES));
ASSERT_EQ(options.statistics->getTickerCount(NON_LAST_LEVEL_READ_COUNT),
options.statistics->getTickerCount(COLD_FILE_READ_COUNT));
ASSERT_EQ(0, options.statistics->getTickerCount(HOT_FILE_READ_BYTES));
ASSERT_LT(0, options.statistics->getTickerCount(COLD_FILE_READ_BYTES));
}
ASSERT_EQ(options.statistics->getTickerCount(LAST_LEVEL_READ_BYTES),
options.statistics->getTickerCount(WARM_FILE_READ_BYTES));
ASSERT_EQ(options.statistics->getTickerCount(LAST_LEVEL_READ_COUNT),
options.statistics->getTickerCount(WARM_FILE_READ_COUNT));
ASSERT_NE(options.statistics->getTickerCount(LAST_LEVEL_READ_COUNT),
options.statistics->getTickerCount(NON_LAST_LEVEL_READ_COUNT));
EXPECT_EQ(
iostats->file_io_stats_by_temperature.unknown_non_last_level_bytes_read,
0);
EXPECT_EQ(
iostats->file_io_stats_by_temperature.unknown_non_last_level_read_count,
0);
EXPECT_EQ(
iostats->file_io_stats_by_temperature.unknown_last_level_bytes_read, 0);
EXPECT_EQ(
iostats->file_io_stats_by_temperature.unknown_last_level_read_count, 0);
}
}
TEST_F(DBTest2, UnknownLastLevelStatistics) {
Options options = CurrentOptions();
options.statistics = CreateDBStatistics();
BlockBasedTableOptions bbto;
bbto.no_block_cache = true;
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
DestroyAndReopen(options);
get_iostats_context()->Reset();
IOStatsContext* iostats = get_iostats_context();
ASSERT_OK(Put("foo", "bar"));
ASSERT_OK(Flush());
ASSERT_EQ("bar", Get("foo"));
EXPECT_GT(
iostats->file_io_stats_by_temperature.unknown_non_last_level_bytes_read,
0);
EXPECT_GT(
iostats->file_io_stats_by_temperature.unknown_non_last_level_read_count,
0);
EXPECT_EQ(iostats->file_io_stats_by_temperature.unknown_last_level_bytes_read,
0);
EXPECT_EQ(iostats->file_io_stats_by_temperature.unknown_last_level_read_count,
0);
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
MoveFilesToLevel(6);
Reopen(options);
get_iostats_context()->Reset();
ASSERT_EQ("bar", Get("foo"));
EXPECT_GT(iostats->file_io_stats_by_temperature.unknown_last_level_bytes_read,
0);
EXPECT_GT(iostats->file_io_stats_by_temperature.unknown_last_level_read_count,
0);
EXPECT_EQ(
iostats->file_io_stats_by_temperature.unknown_non_last_level_bytes_read,
0);
EXPECT_EQ(
iostats->file_io_stats_by_temperature.unknown_non_last_level_read_count,
0);
}
TEST_F(DBTest2, CheckpointFileTemperature) {
class NoLinkTestFS : public FileTemperatureTestFS {
using FileTemperatureTestFS::FileTemperatureTestFS;
IOStatus LinkFile(const std::string&, const std::string&, const IOOptions&,
IODebugContext*) override {
return IOStatus::NotSupported();
}
};
auto test_fs = std::make_shared<NoLinkTestFS>(env_->GetFileSystem());
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, test_fs));
Options options = CurrentOptions();
options.last_level_temperature = Temperature::kWarm;
options.level_compaction_dynamic_level_bytes = true;
options.level0_file_num_compaction_trigger = 2;
options.env = env.get();
Reopen(options);
ASSERT_OK(Put("foo", "bar"));
ASSERT_OK(Put("bar", "bar"));
ASSERT_OK(Flush());
ASSERT_OK(Put("foo", "bar"));
ASSERT_OK(Put("bar", "bar"));
ASSERT_OK(Flush());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_OK(Put("foo", "bar"));
ASSERT_OK(Put("bar", "bar"));
ASSERT_OK(Flush());
auto size = GetSstSizeHelper(Temperature::kWarm);
ASSERT_GT(size, 0);
std::map<uint64_t, Temperature> temperatures;
std::vector<LiveFileStorageInfo> infos;
ASSERT_OK(
dbfull()->GetLiveFilesStorageInfo(LiveFilesStorageInfoOptions(), &infos));
for (const auto& info : infos) {
temperatures.emplace(info.file_number, info.temperature);
}
test_fs->PopRequestedSstFileTemperatures();
Checkpoint* checkpoint;
ASSERT_OK(Checkpoint::Create(db_.get(), &checkpoint));
ASSERT_OK(
checkpoint->CreateCheckpoint(dbname_ + kFilePathSeparator + "tempcp"));
std::vector<std::pair<uint64_t, Temperature>> requested_temps;
test_fs->PopRequestedSstFileTemperatures(&requested_temps);
ASSERT_EQ(requested_temps.size(), 2);
std::set<uint64_t> distinct_requests;
for (const auto& requested_temp : requested_temps) {
ASSERT_EQ(temperatures.at(requested_temp.first), requested_temp.second);
distinct_requests.insert(requested_temp.first);
}
ASSERT_EQ(distinct_requests.size(), requested_temps.size());
delete checkpoint;
Close();
}
TEST_F(DBTest2, FileTemperatureManifestFixup) {
auto test_fs = std::make_shared<FileTemperatureTestFS>(env_->GetFileSystem());
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, test_fs));
Options options = CurrentOptions();
options.last_level_temperature = Temperature::kWarm;
options.level_compaction_dynamic_level_bytes = true;
options.level0_file_num_compaction_trigger = 2;
options.env = env.get();
std::vector<std::string> cfs = { "test1", "test2"};
CreateAndReopenWithCF(cfs, options);
cfs.insert(cfs.begin(), kDefaultColumnFamilyName);
for (int cf = 0; cf < 3; ++cf) {
ASSERT_OK(Put(cf, "a", "val"));
ASSERT_OK(Put(cf, "c", "val"));
ASSERT_OK(Flush(cf));
ASSERT_OK(Put(cf, "b", "val"));
ASSERT_OK(Put(cf, "d", "val"));
ASSERT_OK(Flush(cf));
}
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_GT(GetSstSizeHelper(Temperature::kWarm), 0);
ASSERT_EQ(GetSstSizeHelper(Temperature::kUnknown), 0);
ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0);
ASSERT_EQ(GetSstSizeHelper(Temperature::kHot), 0);
for (int cf = 0; cf < 3; ++cf) {
ASSERT_OK(Put(cf, "e", "val"));
ASSERT_OK(Flush(cf));
}
ASSERT_GT(GetSstSizeHelper(Temperature::kWarm), 0);
ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0);
ASSERT_EQ(GetSstSizeHelper(Temperature::kHot), 0);
std::map<uint64_t, Temperature> current_temps;
test_fs->CopyCurrentSstFileTemperatures(¤t_temps);
for (auto e : current_temps) {
if (e.second == Temperature::kWarm) {
test_fs->OverrideSstFileTemperature(e.first, Temperature::kCold);
}
}
ASSERT_EQ(Get("a"), "val");
ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0);
std::vector<ColumnFamilyDescriptor> column_families;
for (size_t i = 0; i < handles_.size(); ++i) {
ColumnFamilyDescriptor cfdescriptor;
handles_[i]->GetDescriptor(&cfdescriptor).PermitUncheckedError();
column_families.push_back(cfdescriptor);
}
Close();
experimental::UpdateManifestForFilesStateOptions update_opts;
update_opts.update_temperatures = true;
ASSERT_OK(experimental::UpdateManifestForFilesState(
options, dbname_, column_families, update_opts));
ReopenWithColumnFamilies(cfs, options);
ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0);
ASSERT_EQ(GetSstSizeHelper(Temperature::kWarm), 0);
ASSERT_EQ(GetSstSizeHelper(Temperature::kHot), 0);
test_fs->CopyCurrentSstFileTemperatures(¤t_temps);
for (auto e : current_temps) {
if (e.second == Temperature::kUnknown) {
test_fs->OverrideSstFileTemperature(e.first, Temperature::kHot);
}
}
Close();
ASSERT_OK(experimental::UpdateManifestForFilesState(
options, dbname_, column_families, update_opts));
ReopenWithColumnFamilies(cfs, options);
ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0);
ASSERT_EQ(GetSstSizeHelper(Temperature::kUnknown), 0);
ASSERT_EQ(GetSstSizeHelper(Temperature::kWarm), 0);
ASSERT_GT(GetSstSizeHelper(Temperature::kHot), 0);
Close();
}
TEST_F(DBTest2, PointInTimeRecoveryWithIOErrorWhileReadingWal) {
Options options = CurrentOptions();
DestroyAndReopen(options);
ASSERT_OK(Put("foo", "value0"));
Close();
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
bool should_inject_error = false;
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::RecoverLogFiles:BeforeReadWal",
[&](void* ) { should_inject_error = true; });
SyncPoint::GetInstance()->SetCallBack(
"LogReader::ReadMore:AfterReadFile", [&](void* arg) {
if (should_inject_error) {
ASSERT_NE(nullptr, arg);
*static_cast<Status*>(arg) = Status::IOError("Injected IOError");
}
});
SyncPoint::GetInstance()->EnableProcessing();
options.avoid_flush_during_recovery = true;
options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
Status s = TryReopen(options);
ASSERT_TRUE(s.IsIOError());
}
TEST_F(DBTest2, PointInTimeRecoveryWithSyncFailureInCFCreation) {
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::BackgroundCallFlush:Start:1",
"PointInTimeRecoveryWithSyncFailureInCFCreation:1"},
{"PointInTimeRecoveryWithSyncFailureInCFCreation:2",
"DBImpl::BackgroundCallFlush:Start:2"}});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
CreateColumnFamilies({"test1"}, Options());
ASSERT_OK(Put("foo", "bar"));
port::Thread flush_thread([&]() { ASSERT_NOK(Flush()); });
TEST_SYNC_POINT("PointInTimeRecoveryWithSyncFailureInCFCreation:1");
CreateColumnFamilies({"test2"}, Options());
env_->corrupt_in_sync_ = true;
TEST_SYNC_POINT("PointInTimeRecoveryWithSyncFailureInCFCreation:2");
flush_thread.join();
env_->corrupt_in_sync_ = false;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
Options options = CurrentOptions();
options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
ReopenWithColumnFamilies({"default", "test1", "test2"}, options);
}
TEST_F(DBTest2, SortL0FilesByEpochNumber) {
Options options = CurrentOptions();
options.num_levels = 1;
options.compaction_style = kCompactionStyleUniversal;
DestroyAndReopen(options);
ASSERT_OK(Put("key1", "seq1"));
SstFileWriter sst_file_writer{EnvOptions(), options};
std::string external_file1 = dbname_ + "/test_files1.sst";
std::string external_file2 = dbname_ + "/test_files2.sst";
ASSERT_OK(sst_file_writer.Open(external_file1));
ASSERT_OK(sst_file_writer.Put("key2", "seq0"));
ASSERT_OK(sst_file_writer.Finish());
ASSERT_OK(sst_file_writer.Open(external_file2));
ASSERT_OK(sst_file_writer.Put("key3", "seq0"));
ASSERT_OK(sst_file_writer.Finish());
ASSERT_OK(Put("key4", "seq2"));
ASSERT_OK(Flush());
auto* handle = db_->DefaultColumnFamily();
ASSERT_OK(db_->IngestExternalFile(handle, {external_file1, external_file2},
IngestExternalFileOptions()));
std::vector<FileMetaData*> level0_files = GetLevelFileMetadatas(0 );
ASSERT_EQ(level0_files.size(), 3);
EXPECT_EQ(level0_files[0]->epoch_number, 3);
EXPECT_EQ(level0_files[0]->fd.largest_seqno, 0);
ASSERT_EQ(level0_files[0]->num_entries, 1);
ASSERT_TRUE(level0_files[0]->largest.user_key() == Slice("key3"));
EXPECT_EQ(level0_files[1]->epoch_number, 2);
EXPECT_EQ(level0_files[1]->fd.largest_seqno, 0);
ASSERT_EQ(level0_files[1]->num_entries, 1);
ASSERT_TRUE(level0_files[1]->largest.user_key() == Slice("key2"));
EXPECT_EQ(level0_files[2]->epoch_number, 1);
EXPECT_EQ(level0_files[2]->fd.largest_seqno, 2);
ASSERT_EQ(level0_files[2]->num_entries, 2);
ASSERT_TRUE(level0_files[2]->largest.user_key() == Slice("key4"));
ASSERT_TRUE(level0_files[2]->smallest.user_key() == Slice("key1"));
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
level0_files = GetLevelFileMetadatas(0 );
ASSERT_EQ(level0_files.size(), 1);
EXPECT_EQ(level0_files[0]->epoch_number, 1);
ASSERT_EQ(level0_files[0]->num_entries, 4);
ASSERT_TRUE(level0_files[0]->largest.user_key() == Slice("key4"));
ASSERT_TRUE(level0_files[0]->smallest.user_key() == Slice("key1"));
}
TEST_F(DBTest2, SameEpochNumberAfterCompactRangeChangeLevel) {
Options options = CurrentOptions();
options.num_levels = 7;
options.compaction_style = CompactionStyle::kCompactionStyleLevel;
options.disable_auto_compactions = true;
DestroyAndReopen(options);
ASSERT_OK(Put("key1", "seq1"));
ASSERT_OK(Flush());
MoveFilesToLevel(1, 0);
std::vector<FileMetaData*> level0_files = GetLevelFileMetadatas(0 );
ASSERT_EQ(level0_files.size(), 0);
std::vector<FileMetaData*> level1_files = GetLevelFileMetadatas(1 );
ASSERT_EQ(level1_files.size(), 1);
std::vector<FileMetaData*> level2_files = GetLevelFileMetadatas(2 );
ASSERT_EQ(level2_files.size(), 0);
ASSERT_EQ(level1_files[0]->epoch_number, 1);
CompactRangeOptions croptions;
croptions.change_level = true;
croptions.target_level = 0;
ASSERT_OK(db_->CompactRange(croptions, nullptr, nullptr));
level0_files = GetLevelFileMetadatas(0 );
level1_files = GetLevelFileMetadatas(1 );
ASSERT_EQ(level0_files.size(), 1);
ASSERT_EQ(level1_files.size(), 0);
EXPECT_EQ(level0_files[0]->epoch_number, 1);
ASSERT_EQ(level0_files[0]->num_entries, 1);
ASSERT_TRUE(level0_files[0]->largest.user_key() == Slice("key1"));
}
TEST_F(DBTest2, RecoverEpochNumber) {
for (bool allow_ingest_behind : {true, false}) {
Options options = CurrentOptions();
options.allow_ingest_behind = allow_ingest_behind;
options.num_levels = 7;
options.compaction_style = kCompactionStyleLevel;
options.disable_auto_compactions = true;
DestroyAndReopen(options);
CreateAndReopenWithCF({"cf1"}, options);
VersionSet* versions = dbfull()->GetVersionSet();
assert(versions);
const ColumnFamilyData* default_cf =
versions->GetColumnFamilySet()->GetDefault();
const ColumnFamilyData* cf1 =
versions->GetColumnFamilySet()->GetColumnFamily("cf1");
ASSERT_OK(Put("key1", "epoch1"));
ASSERT_OK(Flush());
MoveFilesToLevel(1 , 0 );
ASSERT_OK(Put("key2", "epoch2"));
ASSERT_OK(Flush());
std::vector<FileMetaData*> level0_files =
GetLevelFileMetadatas(0 );
ASSERT_EQ(level0_files.size(), 1);
ASSERT_EQ(level0_files[0]->epoch_number,
allow_ingest_behind
? 2 + kReservedEpochNumberForFileIngestedBehind
: 2);
ASSERT_EQ(level0_files[0]->num_entries, 1);
ASSERT_TRUE(level0_files[0]->largest.user_key() == Slice("key2"));
std::vector<FileMetaData*> level1_files =
GetLevelFileMetadatas(1 );
ASSERT_EQ(level1_files.size(), 1);
ASSERT_EQ(level1_files[0]->epoch_number,
allow_ingest_behind
? 1 + kReservedEpochNumberForFileIngestedBehind
: 1);
ASSERT_EQ(level1_files[0]->num_entries, 1);
ASSERT_TRUE(level1_files[0]->largest.user_key() == Slice("key1"));
ASSERT_OK(Put(1 , "cf1_key1", "epoch1"));
ASSERT_OK(Flush(1 ));
std::vector<FileMetaData*> level0_files_cf1 =
GetLevelFileMetadatas(0 , 1 );
ASSERT_EQ(level0_files_cf1.size(), 1);
ASSERT_EQ(level0_files_cf1[0]->epoch_number,
allow_ingest_behind
? 1 + kReservedEpochNumberForFileIngestedBehind
: 1);
ASSERT_EQ(level0_files_cf1[0]->num_entries, 1);
ASSERT_TRUE(level0_files_cf1[0]->largest.user_key() == Slice("cf1_key1"));
ASSERT_EQ(default_cf->GetNextEpochNumber(),
allow_ingest_behind
? 3 + kReservedEpochNumberForFileIngestedBehind
: 3);
ASSERT_EQ(cf1->GetNextEpochNumber(),
allow_ingest_behind
? 2 + kReservedEpochNumberForFileIngestedBehind
: 2);
ReopenWithColumnFamilies({"default", "cf1"}, options);
versions = dbfull()->GetVersionSet();
assert(versions);
default_cf = versions->GetColumnFamilySet()->GetDefault();
cf1 = versions->GetColumnFamilySet()->GetColumnFamily("cf1");
level0_files = GetLevelFileMetadatas(0 );
ASSERT_EQ(level0_files.size(), 1);
EXPECT_EQ(level0_files[0]->epoch_number,
allow_ingest_behind
? 2 + kReservedEpochNumberForFileIngestedBehind
: 2);
ASSERT_EQ(level0_files[0]->num_entries, 1);
ASSERT_TRUE(level0_files[0]->largest.user_key() == Slice("key2"));
level1_files = GetLevelFileMetadatas(1 );
ASSERT_EQ(level1_files.size(), 1);
EXPECT_EQ(level1_files[0]->epoch_number,
allow_ingest_behind
? 1 + kReservedEpochNumberForFileIngestedBehind
: 1);
ASSERT_EQ(level1_files[0]->num_entries, 1);
ASSERT_TRUE(level1_files[0]->largest.user_key() == Slice("key1"));
level0_files_cf1 = GetLevelFileMetadatas(0 , 1 );
ASSERT_EQ(level0_files_cf1.size(), 1);
EXPECT_EQ(level0_files_cf1[0]->epoch_number,
allow_ingest_behind
? 1 + kReservedEpochNumberForFileIngestedBehind
: 1);
ASSERT_EQ(level0_files_cf1[0]->num_entries, 1);
ASSERT_TRUE(level0_files_cf1[0]->largest.user_key() == Slice("cf1_key1"));
EXPECT_EQ(default_cf->GetNextEpochNumber(),
allow_ingest_behind
? 3 + kReservedEpochNumberForFileIngestedBehind
: 3);
EXPECT_EQ(cf1->GetNextEpochNumber(),
allow_ingest_behind
? 2 + kReservedEpochNumberForFileIngestedBehind
: 2);
}
}
TEST_F(DBTest2, RenameDirectory) {
Options options = CurrentOptions();
DestroyAndReopen(options);
ASSERT_OK(Put("foo", "value0"));
Close();
auto old_dbname = dbname_;
auto new_dbname = dbname_ + "_2";
EXPECT_OK(env_->RenameFile(dbname_, new_dbname));
options.create_if_missing = false;
dbname_ = new_dbname;
ASSERT_OK(TryReopen(options));
ASSERT_EQ("value0", Get("foo"));
Destroy(options);
dbname_ = old_dbname;
}
TEST_F(DBTest2, SstUniqueIdVerifyBackwardCompatible) {
const int kNumSst = 3;
const int kLevel0Trigger = 4;
auto options = CurrentOptions();
options.level0_file_num_compaction_trigger = kLevel0Trigger;
options.statistics = CreateDBStatistics();
options.verify_sst_unique_id_in_manifest = false;
Reopen(options);
std::atomic_int skipped = 0;
std::atomic_int passed = 0;
SyncPoint::GetInstance()->SetCallBack(
"BlockBasedTable::Open::SkippedVerifyUniqueId",
[&](void* ) { skipped++; });
SyncPoint::GetInstance()->SetCallBack(
"BlockBasedTable::Open::PassedVerifyUniqueId",
[&](void* ) { passed++; });
SyncPoint::GetInstance()->EnableProcessing();
for (int i = 0; i < kNumSst; i++) {
for (int j = 0; j < 100; j++) {
ASSERT_OK(Put(Key(i * 10 + j), "value"));
}
ASSERT_OK(Flush());
}
EXPECT_EQ(skipped, kNumSst);
EXPECT_EQ(passed, 0);
options.verify_sst_unique_id_in_manifest = true;
skipped = 0;
passed = 0;
Reopen(options);
EXPECT_EQ(skipped, 0);
EXPECT_EQ(passed, kNumSst);
SyncPoint::GetInstance()->SetCallBack(
"VersionEdit::EncodeTo:UniqueId", [&](void* arg) {
auto unique_id = static_cast<UniqueId64x2*>(arg);
(*unique_id)[0] = 0;
(*unique_id)[1] = 0;
});
for (int i = kNumSst; i < kLevel0Trigger; i++) {
for (int j = 0; j < 100; j++) {
ASSERT_OK(Put(Key(i * 10 + j), "value"));
}
ASSERT_OK(Flush());
}
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ("0,1", FilesPerLevel(0));
ASSERT_TRUE(options.verify_sst_unique_id_in_manifest);
skipped = 0;
passed = 0;
Reopen(options);
EXPECT_EQ(skipped, 1);
EXPECT_EQ(passed, 0);
}
TEST_F(DBTest2, SstUniqueIdVerify) {
const int kNumSst = 3;
const int kLevel0Trigger = 4;
auto options = CurrentOptions();
options.level0_file_num_compaction_trigger = kLevel0Trigger;
options.verify_sst_unique_id_in_manifest = false;
Reopen(options);
SyncPoint::GetInstance()->SetCallBack(
"PropertyBlockBuilder::AddTableProperty:Start", [&](void* props_vs) {
auto props = static_cast<TableProperties*>(props_vs);
props->db_session_id = DBImpl::GenerateDbSessionId(nullptr);
});
SyncPoint::GetInstance()->EnableProcessing();
for (int i = 0; i < kNumSst; i++) {
for (int j = 0; j < 100; j++) {
ASSERT_OK(Put(Key(i * 10 + j), "value"));
}
ASSERT_OK(Flush());
}
options.verify_sst_unique_id_in_manifest = true;
auto s = TryReopen(options);
ASSERT_TRUE(s.IsCorruption());
options.verify_sst_unique_id_in_manifest = false;
Reopen(options);
for (int i = kNumSst; i < kLevel0Trigger; i++) {
for (int j = 0; j < 100; j++) {
ASSERT_OK(Put(Key(i * 10 + j), "value"));
}
ASSERT_OK(Flush());
}
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ("0,1", FilesPerLevel(0));
options.verify_sst_unique_id_in_manifest = true;
s = TryReopen(options);
ASSERT_TRUE(s.IsCorruption());
}
TEST_F(DBTest2, SstUniqueIdVerifyMultiCFs) {
const int kNumSst = 3;
const int kLevel0Trigger = 4;
auto options = CurrentOptions();
options.level0_file_num_compaction_trigger = kLevel0Trigger;
options.verify_sst_unique_id_in_manifest = false;
CreateAndReopenWithCF({"one", "two"}, options);
for (int cf_num : {0, 2}) {
for (int i = 0; i < kNumSst; i++) {
for (int j = 0; j < 100; j++) {
ASSERT_OK(Put(cf_num, Key(i * 10 + j), "value"));
}
ASSERT_OK(Flush(cf_num));
}
}
SyncPoint::GetInstance()->SetCallBack(
"PropertyBlockBuilder::AddTableProperty:Start", [&](void* props_vs) {
auto props = static_cast<TableProperties*>(props_vs);
props->db_session_id = DBImpl::GenerateDbSessionId(nullptr);
});
SyncPoint::GetInstance()->EnableProcessing();
for (int i = 0; i < kNumSst; i++) {
for (int j = 0; j < 100; j++) {
ASSERT_OK(Put(1, Key(i * 10 + j), "value"));
}
ASSERT_OK(Flush(1));
}
options.verify_sst_unique_id_in_manifest = true;
auto s = TryReopenWithColumnFamilies({"default", "one", "two"}, options);
ASSERT_TRUE(s.IsCorruption());
}
TEST_F(DBTest2, BestEffortsRecoveryWithSstUniqueIdVerification) {
const auto tamper_with_uniq_id = [&](void* arg) {
auto props = static_cast<TableProperties*>(arg);
assert(props);
props->db_session_id = DBImpl::GenerateDbSessionId(nullptr);
};
const auto assert_db = [&](size_t expected_count,
const std::string& expected_v) {
std::unique_ptr<Iterator> it(db_->NewIterator(ReadOptions()));
size_t cnt = 0;
for (it->SeekToFirst(); it->Valid(); it->Next(), ++cnt) {
ASSERT_EQ(std::to_string(cnt), it->key());
ASSERT_EQ(expected_v, it->value());
}
EXPECT_OK(it->status());
ASSERT_EQ(expected_count, cnt);
};
const int num_l0_compaction_trigger = 8;
const int num_l0 = num_l0_compaction_trigger - 1;
Options options = CurrentOptions();
options.level0_file_num_compaction_trigger = num_l0_compaction_trigger;
for (int k = 0; k < num_l0; ++k) {
options.verify_sst_unique_id_in_manifest = false;
DestroyAndReopen(options);
constexpr size_t num_keys_per_file = 10;
for (int i = 0; i < num_l0; ++i) {
for (size_t j = 0; j < num_keys_per_file; ++j) {
ASSERT_OK(Put(std::to_string(j), "v" + std::to_string(i)));
}
if (i == k) {
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->SetCallBack(
"PropertyBlockBuilder::AddTableProperty:Start",
tamper_with_uniq_id);
SyncPoint::GetInstance()->EnableProcessing();
}
ASSERT_OK(Flush());
}
options.verify_sst_unique_id_in_manifest = true;
Status s = TryReopen(options);
ASSERT_TRUE(s.IsCorruption());
options.best_efforts_recovery = true;
Reopen(options);
assert_db(k == 0 ? 0 : num_keys_per_file, "v" + std::to_string(k - 1));
options.best_efforts_recovery = false;
Reopen(options);
assert_db(k == 0 ? 0 : num_keys_per_file, "v" + std::to_string(k - 1));
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
for (size_t i = 0; i < num_keys_per_file; ++i) {
ASSERT_OK(Put(std::to_string(i), "v"));
}
ASSERT_OK(Flush());
Reopen(options);
{
for (size_t i = 0; i < num_keys_per_file; ++i) {
ASSERT_EQ("v", Get(std::to_string(i)));
}
}
}
}
TEST_F(DBTest2, GetLatestSeqAndTsForKey) {
Destroy(last_options_);
Options options = CurrentOptions();
options.max_write_buffer_size_to_maintain = 64 << 10;
options.create_if_missing = true;
options.disable_auto_compactions = true;
options.comparator = test::BytewiseComparatorWithU64TsWrapper();
options.statistics = CreateDBStatistics();
Reopen(options);
constexpr uint64_t kTsU64Value = 12;
for (uint64_t key = 0; key < 100; ++key) {
std::string ts;
PutFixed64(&ts, kTsU64Value);
std::string key_str;
PutFixed64(&key_str, key);
std::reverse(key_str.begin(), key_str.end());
ASSERT_OK(db_->Put(WriteOptions(), key_str, ts, "value"));
}
ASSERT_OK(Flush());
constexpr bool cache_only = true;
constexpr SequenceNumber lower_bound_seq = 0;
auto* cfhi = static_cast_with_check<ColumnFamilyHandleImpl>(
dbfull()->DefaultColumnFamily());
assert(cfhi);
assert(cfhi->cfd());
SuperVersion* sv = cfhi->cfd()->GetSuperVersion();
for (uint64_t key = 0; key < 100; ++key) {
std::string key_str;
PutFixed64(&key_str, key);
std::reverse(key_str.begin(), key_str.end());
std::string ts;
SequenceNumber seq = kMaxSequenceNumber;
bool found_record_for_key = false;
bool is_blob_index = false;
const Status s = dbfull()->GetLatestSequenceForKey(
sv, key_str, cache_only, lower_bound_seq, &seq, &ts,
&found_record_for_key, &is_blob_index);
ASSERT_OK(s);
std::string expected_ts;
PutFixed64(&expected_ts, kTsU64Value);
ASSERT_EQ(expected_ts, ts);
ASSERT_TRUE(found_record_for_key);
ASSERT_FALSE(is_blob_index);
}
ASSERT_EQ(0, options.statistics->getTickerCount(GET_HIT_L0));
}
#if defined(ZSTD)
TEST_F(DBTest2, ZSTDChecksum) {
Options options = CurrentOptions();
options.create_if_missing = true;
options.compression = kZSTD;
options.compression_opts.max_compressed_bytes_per_kb = 1024;
options.compression_opts.checksum = true;
DestroyAndReopen(options);
Random rnd(33);
ASSERT_OK(Put(Key(0), rnd.RandomString(4 << 10)));
SyncPoint::GetInstance()->SetCallBack(
"BlockBasedTableBuilder::WriteBlock:TamperWithCompressedData",
[&](void* arg) {
std::string* output = static_cast<std::string*>(arg);
output->data()[output->size() - 1]++;
});
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(Flush());
PinnableSlice val;
Status s = Get(Key(0), &val);
ASSERT_TRUE(s.IsCorruption());
options.paranoid_file_checks = true;
DestroyAndReopen(options);
ASSERT_OK(Put(Key(0), rnd.RandomString(4 << 10)));
s = Flush();
ASSERT_TRUE(s.IsCorruption());
}
#endif
TEST_F(DBTest2, TableCacheMissDuringReadFromBlockCacheTier) {
Options options = CurrentOptions();
options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
Reopen(options);
dbfull()->TEST_table_cache()->SetCapacity(0);
ASSERT_OK(Put("foo", "bar"));
ASSERT_OK(Flush());
uint64_t orig_num_file_opens = TestGetTickerCount(options, NO_FILE_OPENS);
ReadOptions non_blocking_opts;
non_blocking_opts.read_tier = kBlockCacheTier;
std::string value;
ASSERT_TRUE(db_->Get(non_blocking_opts, "foo", &value).IsIncomplete());
ASSERT_EQ(orig_num_file_opens, TestGetTickerCount(options, NO_FILE_OPENS));
}
TEST_F(DBTest2, GetFileChecksumsFromCurrentManifest_CRC32) {
Options opts = CurrentOptions();
opts.create_if_missing = true;
opts.file_checksum_gen_factory = GetFileChecksumGenCrc32cFactory();
opts.level0_file_num_compaction_trigger = 10;
std::unique_ptr<DB> db;
std::string dbname = test::PerThreadDBPath("file_chksum");
ASSERT_OK(DB::Open(opts, dbname, &db));
WriteOptions wopts;
FlushOptions fopts;
fopts.wait = true;
Random rnd(test::RandomSeed());
for (int i = 0; i < 4; i++) {
ASSERT_OK(db->Put(wopts, Key(i), rnd.RandomString(100)));
ASSERT_OK(db->Flush(fopts));
}
ColumnFamilyHandle* cf;
ASSERT_OK(
db->CreateColumnFamily(ColumnFamilyOptions(), "soon_to_be_deleted", &cf));
ASSERT_OK(db->Put(wopts, cf, "some_key", "some_value"));
ASSERT_OK(db->Flush(fopts, cf));
ASSERT_OK(db->DropColumnFamily(cf));
delete cf;
cf = nullptr;
std::vector<LiveFileMetaData> live_files;
db->GetLiveFilesMetaData(&live_files);
ASSERT_OK(db->Close());
db.reset();
std::unique_ptr<FileChecksumList> checksum_list(NewFileChecksumList());
auto read_only_fs =
std::make_shared<ReadOnlyFileSystem>(env_->GetFileSystem());
ASSERT_OK(experimental::GetFileChecksumsFromCurrentManifest(
read_only_fs.get(), dbname, checksum_list.get()));
ASSERT_TRUE(checksum_list != nullptr);
std::vector<uint64_t> file_numbers;
std::vector<std::string> checksums;
std::vector<std::string> checksum_func_names;
ASSERT_OK(checksum_list->GetAllFileChecksums(&file_numbers, &checksums,
&checksum_func_names));
ASSERT_EQ(live_files.size(), checksum_list->size());
for (size_t i = 0; i < live_files.size(); i++) {
std::string stored_checksum;
std::string stored_func_name;
ASSERT_OK(checksum_list->SearchOneFileChecksum(
live_files[i].file_number, &stored_checksum, &stored_func_name));
ASSERT_EQ(live_files[i].file_checksum, stored_checksum);
ASSERT_EQ(live_files[i].file_checksum_func_name, stored_func_name);
}
}
}
int main(int argc, char** argv) {
ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
RegisterCustomObjects(argc, argv);
return RUN_ALL_TESTS();
}