#include <atomic>
#include <limits>
#include "db/db_impl/db_impl.h"
#include "db/db_test_util.h"
#include "env/mock_env.h"
#include "file/filename.h"
#include "port/port.h"
#include "port/stack_trace.h"
#include "rocksdb/utilities/transaction_db.h"
#include "test_util/sync_point.h"
#include "test_util/testutil.h"
#include "util/cast_util.h"
#include "util/mutexlock.h"
#include "utilities/fault_injection_env.h"
#include "utilities/fault_injection_fs.h"
namespace ROCKSDB_NAMESPACE {
static std::string NEW_VALUE = "NewValue";
class DBFlushTest : public DBTestBase {
public:
DBFlushTest() : DBTestBase("db_flush_test", true) {}
};
class DBFlushDirectIOTest : public DBFlushTest,
public ::testing::WithParamInterface<bool> {
public:
DBFlushDirectIOTest() : DBFlushTest() {}
};
class DBAtomicFlushTest : public DBFlushTest,
public ::testing::WithParamInterface<bool> {
public:
DBAtomicFlushTest() : DBFlushTest() {}
};
TEST_F(DBFlushTest, FlushWhileWritingManifest) {
Options options;
options.disable_auto_compactions = true;
options.max_background_flushes = 2;
options.env = env_;
Reopen(options);
FlushOptions no_wait;
no_wait.wait = false;
no_wait.allow_write_stall = true;
SyncPoint::GetInstance()->LoadDependency(
{{"VersionSet::LogAndApply:WriteManifest",
"DBFlushTest::FlushWhileWritingManifest:1"},
{"MemTableList::TryInstallMemtableFlushResults:InProgress",
"VersionSet::LogAndApply:WriteManifestDone"}});
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(Put("foo", "v"));
ASSERT_OK(dbfull()->Flush(no_wait));
TEST_SYNC_POINT("DBFlushTest::FlushWhileWritingManifest:1");
ASSERT_OK(Put("bar", "v"));
ASSERT_OK(dbfull()->Flush(no_wait));
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
ASSERT_EQ(2, TotalTableFiles());
}
TEST_F(DBFlushTest, SyncFail) {
std::unique_ptr<FaultInjectionTestEnv> fault_injection_env(
new FaultInjectionTestEnv(env_));
Options options;
options.disable_auto_compactions = true;
options.env = fault_injection_env.get();
SyncPoint::GetInstance()->LoadDependency(
{{"DBFlushTest::SyncFail:1", "DBImpl::SyncClosedWals:Start"},
{"DBImpl::SyncClosedWals:Failed", "DBFlushTest::SyncFail:2"}});
SyncPoint::GetInstance()->EnableProcessing();
CreateAndReopenWithCF({"pikachu"}, options);
ASSERT_OK(Put("key", "value"));
FlushOptions flush_options;
flush_options.wait = false;
ASSERT_OK(dbfull()->Flush(flush_options));
fault_injection_env->SetFilesystemActive(false);
TEST_SYNC_POINT("DBFlushTest::SyncFail:1");
TEST_SYNC_POINT("DBFlushTest::SyncFail:2");
fault_injection_env->SetFilesystemActive(true);
ASSERT_NOK(dbfull()->TEST_WaitForFlushMemTable());
ASSERT_EQ("", FilesPerLevel()); Destroy(options);
}
TEST_F(DBFlushTest, SyncSkip) {
Options options = CurrentOptions();
SyncPoint::GetInstance()->LoadDependency(
{{"DBFlushTest::SyncSkip:1", "DBImpl::SyncClosedWals:Skip"},
{"DBImpl::SyncClosedWals:Skip", "DBFlushTest::SyncSkip:2"}});
SyncPoint::GetInstance()->EnableProcessing();
Reopen(options);
ASSERT_OK(Put("key", "value"));
FlushOptions flush_options;
flush_options.wait = false;
ASSERT_OK(dbfull()->Flush(flush_options));
TEST_SYNC_POINT("DBFlushTest::SyncSkip:1");
TEST_SYNC_POINT("DBFlushTest::SyncSkip:2");
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
Destroy(options);
}
TEST_F(DBFlushTest, FlushInLowPriThreadPool) {
Options options = CurrentOptions();
options.level0_file_num_compaction_trigger = 4;
options.memtable_factory.reset(test::NewSpecialSkipListFactory(1));
Reopen(options);
env_->SetBackgroundThreads(0, Env::HIGH);
std::thread::id tid;
int num_flushes = 0, num_compactions = 0;
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BGWorkFlush", [&](void* ) {
if (tid == std::thread::id()) {
tid = std::this_thread::get_id();
} else {
ASSERT_EQ(tid, std::this_thread::get_id());
}
++num_flushes;
});
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BGWorkCompaction", [&](void* ) {
ASSERT_EQ(tid, std::this_thread::get_id());
++num_compactions;
});
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(Put("key", "val"));
for (int i = 0; i < 4; ++i) {
ASSERT_OK(Put("key", "val"));
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
}
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ(4, num_flushes);
ASSERT_EQ(1, num_compactions);
}
TEST_F(DBFlushTest, CloseDBWhenFlushInLowPri) {
Options options = CurrentOptions();
options.max_background_flushes = 1;
options.max_total_wal_size = 8192;
DestroyAndReopen(options);
CreateColumnFamilies({"cf1", "cf2"}, options);
env_->SetBackgroundThreads(0, Env::HIGH);
env_->SetBackgroundThreads(1, Env::LOW);
test::SleepingBackgroundTask sleeping_task_low;
int num_flushes = 0;
SyncPoint::GetInstance()->SetCallBack("DBImpl::BGWorkFlush",
[&](void* ) { ++num_flushes; });
int num_low_flush_unscheduled = 0;
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::UnscheduleLowFlushCallback", [&](void* ) {
num_low_flush_unscheduled++;
ASSERT_EQ(num_low_flush_unscheduled, 1);
});
int num_high_flush_unscheduled = 0;
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::UnscheduleHighFlushCallback", [&](void* ) {
num_high_flush_unscheduled++;
ASSERT_EQ(num_high_flush_unscheduled, 0);
});
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(Put(0, "key1", DummyString(8192)));
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
Env::Priority::LOW);
sleeping_task_low.WaitUntilSleeping();
ASSERT_OK(Put(0, "key2", DummyString(8192)));
Close();
sleeping_task_low.WakeUp();
sleeping_task_low.WaitUntilDone();
ASSERT_EQ(0, num_flushes);
ASSERT_OK(TryReopenWithColumnFamilies({"default", "cf1", "cf2"}, options));
ASSERT_OK(Put(0, "key3", DummyString(8192)));
ASSERT_OK(Flush(0));
ASSERT_EQ(1, num_flushes);
}
TEST_F(DBFlushTest, ManualFlushWithMinWriteBufferNumberToMerge) {
Options options = CurrentOptions();
options.write_buffer_size = 100;
options.max_write_buffer_number = 4;
options.min_write_buffer_number_to_merge = 3;
Reopen(options);
SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::BGWorkFlush",
"DBFlushTest::ManualFlushWithMinWriteBufferNumberToMerge:1"},
{"DBFlushTest::ManualFlushWithMinWriteBufferNumberToMerge:2",
"FlushJob::WriteLevel0Table"}});
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(Put("key1", "value1"));
port::Thread t([&]() {
ASSERT_OK(Flush());
});
TEST_SYNC_POINT("DBFlushTest::ManualFlushWithMinWriteBufferNumberToMerge:1");
ASSERT_OK(Put("key2", "value2"));
ASSERT_OK(dbfull()->TEST_SwitchMemtable());
TEST_SYNC_POINT("DBFlushTest::ManualFlushWithMinWriteBufferNumberToMerge:2");
t.join();
}
TEST_F(DBFlushTest, ScheduleOnlyOneBgThread) {
Options options = CurrentOptions();
Reopen(options);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
int called = 0;
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::MaybeScheduleFlushOrCompaction:AfterSchedule:0", [&](void* arg) {
ASSERT_NE(nullptr, arg);
auto unscheduled_flushes = *static_cast<int*>(arg);
ASSERT_EQ(0, unscheduled_flushes);
++called;
});
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(Put("a", "foo"));
FlushOptions flush_opts;
ASSERT_OK(dbfull()->Flush(flush_opts));
ASSERT_EQ(1, called);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}
TEST_F(DBFlushTest, StatisticsGarbageBasic) {
Options options = CurrentOptions();
options.statistics = CreateDBStatistics();
options.statistics->set_stats_level(StatsLevel::kAll);
options.create_if_missing = true;
options.compression = kNoCompression;
options.inplace_update_support = false;
options.allow_concurrent_memtable_write = true;
options.write_buffer_size = 64 << 20;
ASSERT_OK(TryReopen(options));
const size_t NUM_REPEAT = 2000;
const size_t RAND_VALUES_LENGTH = 172;
const std::string KEY1 = "key1";
const std::string KEY2 = "key2";
const std::string KEY3 = "key3";
const std::string VALUE1 = "value1";
const std::string VALUE2 = "value2";
const std::string VALUE3 = "value3";
uint64_t EXPECTED_MEMTABLE_PAYLOAD_BYTES_AT_FLUSH = 0;
uint64_t EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH = 0;
Random rnd(301);
for (size_t i = 0; i < NUM_REPEAT; i++) {
std::string p_v1 = rnd.RandomString(RAND_VALUES_LENGTH);
std::string p_v2 = rnd.RandomString(RAND_VALUES_LENGTH);
std::string p_v3 = rnd.RandomString(RAND_VALUES_LENGTH);
ASSERT_OK(Put(KEY1, p_v1));
ASSERT_OK(Put(KEY2, p_v2));
ASSERT_OK(Put(KEY3, p_v3));
EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH +=
KEY1.size() + p_v1.size() + sizeof(uint64_t);
EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH +=
KEY2.size() + p_v2.size() + sizeof(uint64_t);
EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH +=
KEY3.size() + p_v3.size() + sizeof(uint64_t);
}
EXPECTED_MEMTABLE_PAYLOAD_BYTES_AT_FLUSH =
EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH;
ASSERT_OK(Put(KEY1, VALUE1));
ASSERT_OK(Put(KEY2, VALUE2));
ASSERT_OK(Put(KEY3, VALUE3));
EXPECTED_MEMTABLE_PAYLOAD_BYTES_AT_FLUSH +=
KEY1.size() + VALUE1.size() + KEY2.size() + VALUE2.size() + KEY3.size() +
VALUE3.size() + 3 * sizeof(uint64_t);
PinnableSlice value;
ASSERT_OK(Get(KEY1, &value));
ASSERT_EQ(value.ToString(), VALUE1);
ASSERT_OK(Get(KEY2, &value));
ASSERT_EQ(value.ToString(), VALUE2);
ASSERT_OK(Get(KEY3, &value));
ASSERT_EQ(value.ToString(), VALUE3);
ASSERT_OK(Flush());
uint64_t mem_data_bytes =
TestGetTickerCount(options, MEMTABLE_PAYLOAD_BYTES_AT_FLUSH);
uint64_t mem_garbage_bytes =
TestGetTickerCount(options, MEMTABLE_GARBAGE_BYTES_AT_FLUSH);
EXPECT_EQ(mem_data_bytes, EXPECTED_MEMTABLE_PAYLOAD_BYTES_AT_FLUSH);
EXPECT_EQ(mem_garbage_bytes, EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH);
Close();
}
TEST_F(DBFlushTest, StatisticsGarbageInsertAndDeletes) {
Options options = CurrentOptions();
options.statistics = CreateDBStatistics();
options.statistics->set_stats_level(StatsLevel::kAll);
options.create_if_missing = true;
options.compression = kNoCompression;
options.inplace_update_support = false;
options.allow_concurrent_memtable_write = true;
options.write_buffer_size = 67108864;
ASSERT_OK(TryReopen(options));
const size_t NUM_REPEAT = 2000;
const size_t RAND_VALUES_LENGTH = 37;
const std::string KEY1 = "key1";
const std::string KEY2 = "key2";
const std::string KEY3 = "key3";
const std::string KEY4 = "key4";
const std::string KEY5 = "key5";
const std::string KEY6 = "key6";
uint64_t EXPECTED_MEMTABLE_PAYLOAD_BYTES_AT_FLUSH = 0;
uint64_t EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH = 0;
WriteBatch batch;
Random rnd(301);
for (size_t i = 0; i < NUM_REPEAT; i++) {
std::string p_v1 = rnd.RandomString(RAND_VALUES_LENGTH);
std::string p_v2 = rnd.RandomString(RAND_VALUES_LENGTH);
std::string p_v3 = rnd.RandomString(RAND_VALUES_LENGTH);
ASSERT_OK(Put(KEY1, p_v1));
ASSERT_OK(Put(KEY2, p_v2));
ASSERT_OK(Put(KEY3, p_v3));
EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH +=
KEY1.size() + p_v1.size() + sizeof(uint64_t);
EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH +=
KEY2.size() + p_v2.size() + sizeof(uint64_t);
EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH +=
KEY3.size() + p_v3.size() + sizeof(uint64_t);
ASSERT_OK(Delete(KEY1));
ASSERT_OK(Delete(KEY2));
ASSERT_OK(Delete(KEY3));
EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH +=
KEY1.size() + KEY2.size() + KEY3.size() + 3 * sizeof(uint64_t);
}
EXPECTED_MEMTABLE_PAYLOAD_BYTES_AT_FLUSH =
EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH;
EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH -=
KEY1.size() + KEY2.size() + KEY3.size() + 3 * sizeof(uint64_t);
ASSERT_OK(Delete(KEY4));
ASSERT_OK(Delete(KEY5));
ASSERT_OK(Delete(KEY6));
EXPECTED_MEMTABLE_PAYLOAD_BYTES_AT_FLUSH +=
KEY4.size() + KEY5.size() + KEY6.size() + 3 * sizeof(uint64_t);
PinnableSlice value;
ASSERT_NOK(Get(KEY1, &value));
ASSERT_NOK(Get(KEY2, &value));
ASSERT_NOK(Get(KEY3, &value));
ASSERT_OK(Flush());
uint64_t mem_data_bytes =
TestGetTickerCount(options, MEMTABLE_PAYLOAD_BYTES_AT_FLUSH);
uint64_t mem_garbage_bytes =
TestGetTickerCount(options, MEMTABLE_GARBAGE_BYTES_AT_FLUSH);
EXPECT_EQ(mem_data_bytes, EXPECTED_MEMTABLE_PAYLOAD_BYTES_AT_FLUSH);
EXPECT_EQ(mem_garbage_bytes, EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH);
Close();
}
TEST_F(DBFlushTest, StatisticsGarbageRangeDeletes) {
Options options = CurrentOptions();
options.statistics = CreateDBStatistics();
options.statistics->set_stats_level(StatsLevel::kAll);
options.create_if_missing = true;
options.compression = kNoCompression;
options.inplace_update_support = false;
options.allow_concurrent_memtable_write = true;
options.write_buffer_size = 67108864;
ASSERT_OK(TryReopen(options));
const size_t NUM_REPEAT = 1000;
const size_t RAND_VALUES_LENGTH = 42;
const std::string KEY1 = "key1";
const std::string KEY2 = "key2";
const std::string KEY3 = "key3";
const std::string KEY4 = "key4";
const std::string KEY5 = "key5";
const std::string KEY6 = "key6";
const std::string VALUE3 = "value3";
uint64_t EXPECTED_MEMTABLE_PAYLOAD_BYTES_AT_FLUSH = 0;
uint64_t EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH = 0;
Random rnd(301);
for (size_t i = 0; i < NUM_REPEAT; i++) {
std::string p_v1 = rnd.RandomString(RAND_VALUES_LENGTH);
std::string p_v2 = rnd.RandomString(RAND_VALUES_LENGTH);
std::string p_v3 = rnd.RandomString(RAND_VALUES_LENGTH);
ASSERT_OK(Put(KEY1, p_v1));
ASSERT_OK(Put(KEY2, p_v2));
ASSERT_OK(Put(KEY3, p_v3));
EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH +=
KEY1.size() + p_v1.size() + sizeof(uint64_t);
EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH +=
KEY2.size() + p_v2.size() + sizeof(uint64_t);
EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH +=
KEY3.size() + p_v3.size() + sizeof(uint64_t);
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), KEY1,
KEY2));
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), KEY2,
KEY3));
EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH +=
(KEY1.size() + KEY2.size() + sizeof(uint64_t)) +
(KEY2.size() + KEY3.size() + sizeof(uint64_t));
}
EXPECTED_MEMTABLE_PAYLOAD_BYTES_AT_FLUSH =
EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH;
EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH -=
(KEY1.size() + KEY2.size() + sizeof(uint64_t)) +
(KEY2.size() + KEY3.size() + sizeof(uint64_t));
ASSERT_OK(Put(KEY3, VALUE3));
EXPECTED_MEMTABLE_PAYLOAD_BYTES_AT_FLUSH +=
KEY3.size() + VALUE3.size() + sizeof(uint64_t);
ASSERT_OK(
db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), KEY4, KEY5));
ASSERT_OK(
db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), KEY5, KEY6));
EXPECTED_MEMTABLE_PAYLOAD_BYTES_AT_FLUSH +=
(KEY4.size() + KEY5.size() + sizeof(uint64_t)) +
(KEY5.size() + KEY6.size() + sizeof(uint64_t));
PinnableSlice value;
ASSERT_NOK(Get(KEY1, &value));
ASSERT_NOK(Get(KEY2, &value));
ASSERT_OK(Get(KEY3, &value));
ASSERT_EQ(value, VALUE3);
ASSERT_OK(Flush());
uint64_t mem_data_bytes =
TestGetTickerCount(options, MEMTABLE_PAYLOAD_BYTES_AT_FLUSH);
uint64_t mem_garbage_bytes =
TestGetTickerCount(options, MEMTABLE_GARBAGE_BYTES_AT_FLUSH);
EXPECT_EQ(mem_data_bytes, EXPECTED_MEMTABLE_PAYLOAD_BYTES_AT_FLUSH);
EXPECT_EQ(mem_garbage_bytes, EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH);
Close();
}
class TestFlushListener : public EventListener {
public:
TestFlushListener(Env* env, DBFlushTest* test)
: slowdown_count(0), stop_count(0), db_closed(), env_(env), test_(test) {
db_closed = false;
}
~TestFlushListener() override {
prev_fc_info_.status.PermitUncheckedError(); }
void OnTableFileCreated(const TableFileCreationInfo& info) override {
prev_fc_info_ = info;
ASSERT_GT(info.db_name.size(), 0U);
ASSERT_GT(info.cf_name.size(), 0U);
ASSERT_GT(info.file_path.size(), 0U);
ASSERT_GT(info.job_id, 0);
ASSERT_GT(info.table_properties.data_size, 0U);
ASSERT_GT(info.table_properties.raw_key_size, 0U);
ASSERT_GT(info.table_properties.raw_value_size, 0U);
ASSERT_GT(info.table_properties.num_data_blocks, 0U);
ASSERT_GT(info.table_properties.num_entries, 0U);
ASSERT_EQ(info.file_checksum, kUnknownFileChecksum);
ASSERT_EQ(info.file_checksum_func_name, kUnknownFileChecksumFuncName);
}
void OnFlushCompleted(DB* db, const FlushJobInfo& info) override {
flushed_dbs_.push_back(db);
flushed_column_family_names_.push_back(info.cf_name);
if (info.triggered_writes_slowdown) {
slowdown_count++;
}
if (info.triggered_writes_stop) {
stop_count++;
}
ASSERT_EQ(prev_fc_info_.db_name, db->GetName());
ASSERT_EQ(prev_fc_info_.cf_name, info.cf_name);
ASSERT_EQ(prev_fc_info_.job_id, info.job_id);
ASSERT_EQ(prev_fc_info_.file_path, info.file_path);
ASSERT_EQ(TableFileNameToNumber(info.file_path), info.file_number);
ASSERT_TRUE(test_);
if (db == test_->db_.get()) {
std::vector<std::vector<FileMetaData>> files_by_level;
test_->dbfull()->TEST_GetFilesMetaData(db->DefaultColumnFamily(),
&files_by_level);
ASSERT_FALSE(files_by_level.empty());
auto it = std::find_if(files_by_level[0].begin(), files_by_level[0].end(),
[&](const FileMetaData& meta) {
return meta.fd.GetNumber() == info.file_number;
});
ASSERT_NE(it, files_by_level[0].end());
ASSERT_EQ(info.oldest_blob_file_number, it->oldest_blob_file_number);
}
ASSERT_EQ(db->GetEnv()->GetThreadID(), info.thread_id);
ASSERT_GT(info.thread_id, 0U);
}
std::vector<std::string> flushed_column_family_names_;
std::vector<DB*> flushed_dbs_;
int slowdown_count;
int stop_count;
bool db_closing;
std::atomic_bool db_closed;
TableFileCreationInfo prev_fc_info_;
protected:
Env* env_;
DBFlushTest* test_;
};
TEST_F(
DBFlushTest,
FixUnrecoverableWriteDuringAtomicFlushWaitUntilFlushWouldNotStallWrites) {
Options options = CurrentOptions();
options.atomic_flush = true;
options.avoid_flush_during_shutdown = true;
options.level0_stop_writes_trigger = 2;
options.level0_slowdown_writes_trigger = 2;
options.level0_file_num_compaction_trigger = -1;
CreateAndReopenWithCF({"cf1"}, options);
std::unique_ptr<test::SleepingBackgroundTask> sleeping_task_;
sleeping_task_.reset(new test::SleepingBackgroundTask());
env_->SetBackgroundThreads(1, Env::LOW);
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
sleeping_task_.get(), Env::Priority::LOW);
sleeping_task_->WaitUntilSleeping();
ASSERT_OK(Put(1, "dontcare", "dontcare"));
ASSERT_OK(Flush(1));
WriteOptions write_opts;
write_opts.disableWAL = true;
ASSERT_OK(Put(1, "k1", "v1", write_opts));
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({{
"DBImpl::WaitUntilFlushWouldNotStallWrites:StallWait",
"DBFlushTest::"
"UnrecoverableWriteInAtomicFlushWaitUntilFlushWouldNotStallWrites::Write",
}});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
port::Thread write_thread([&] {
TEST_SYNC_POINT(
"DBFlushTest::"
"UnrecoverableWriteInAtomicFlushWaitUntilFlushWouldNotStallWrites::"
"Write");
ASSERT_OK(Put(0, "k2", "v2", write_opts));
ASSERT_OK(Put(1, "k3", "v3", write_opts));
sleeping_task_->WakeUp();
sleeping_task_->WaitUntilDone();
MoveFilesToLevel(1, 1);
});
std::vector<std::string> files;
uint64_t manifest_file_size;
ASSERT_OK(db_->GetLiveFiles(files, &manifest_file_size, true));
write_thread.join();
ReopenWithColumnFamilies({"default", "cf1"}, options);
ASSERT_EQ(Get(1, "k3"), "v3");
ASSERT_EQ(Get(0, "k2"), "v2");
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(DBFlushTest, FixFlushReasonRaceFromConcurrentFlushes) {
Options options = CurrentOptions();
options.atomic_flush = true;
options.disable_auto_compactions = true;
CreateAndReopenWithCF({"cf1"}, options);
for (int idx = 0; idx < 1; ++idx) {
ASSERT_OK(Put(0, Key(idx), std::string(1, 'v')));
ASSERT_OK(Put(1, Key(idx), std::string(1, 'v')));
}
std::shared_ptr<test::SleepingBackgroundTask> sleeping_task =
std::make_shared<test::SleepingBackgroundTask>();
env_->SetBackgroundThreads(1, Env::HIGH);
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
sleeping_task.get(), Env::Priority::HIGH);
sleeping_task->WaitUntilSleeping();
bool get_live_files_paused_at_sync_point = false;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::AtomicFlushMemTables:AfterScheduleFlush", [&](void* ) {
if (get_live_files_paused_at_sync_point) {
return;
}
get_live_files_paused_at_sync_point = true;
FlushOptions fo;
fo.wait = false;
fo.allow_write_stall = true;
ASSERT_OK(dbfull()->Flush(fo));
sleeping_task->WakeUp();
sleeping_task->WaitUntilDone();
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
std::vector<std::string> files;
uint64_t manifest_file_size;
ASSERT_OK(db_->GetLiveFiles(files, &manifest_file_size, true));
ASSERT_TRUE(get_live_files_paused_at_sync_point);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(DBFlushTest, MemPurgeBasic) {
Options options = CurrentOptions();
options.statistics = CreateDBStatistics();
options.statistics->set_stats_level(StatsLevel::kAll);
options.create_if_missing = true;
options.compression = kNoCompression;
options.inplace_update_support = false;
options.allow_concurrent_memtable_write = true;
options.write_buffer_size = 1 << 20;
options.experimental_mempurge_threshold = 0.0;
TestFlushListener* listener = new TestFlushListener(options.env, this);
options.listeners.emplace_back(listener);
ASSERT_OK(TryReopen(options));
ColumnFamilyHandle* cfh = db_->DefaultColumnFamily();
ASSERT_OK(db_->SetOptions(cfh, {{"experimental_mempurge_threshold", "1.0"}}));
std::atomic<uint32_t> mempurge_count{0};
std::atomic<uint32_t> sst_count{0};
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::FlushJob:MemPurgeSuccessful",
[&](void* ) { mempurge_count++; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::FlushJob:SSTFileCreated", [&](void* ) { sst_count++; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
std::string KEY1 = "IamKey1";
std::string KEY2 = "IamKey2";
std::string KEY3 = "IamKey3";
std::string KEY4 = "IamKey4";
std::string KEY5 = "IamKey5";
std::string KEY6 = "IamKey6";
std::string KEY7 = "IamKey7";
std::string KEY8 = "IamKey8";
std::string KEY9 = "IamKey9";
std::string RNDKEY1, RNDKEY2, RNDKEY3;
const std::string NOT_FOUND = "NOT_FOUND";
Random rnd(719);
const size_t NUM_REPEAT = 100;
const size_t RAND_KEYS_LENGTH = 57;
const size_t RAND_VALUES_LENGTH = 10240;
std::string p_v1, p_v2, p_v3, p_v4, p_v5, p_v6, p_v7, p_v8, p_v9, p_rv1,
p_rv2, p_rv3;
p_v1 = rnd.RandomString(RAND_VALUES_LENGTH);
p_v2 = rnd.RandomString(RAND_VALUES_LENGTH);
p_v3 = rnd.RandomString(RAND_VALUES_LENGTH);
p_v4 = rnd.RandomString(RAND_VALUES_LENGTH);
ASSERT_OK(Put(KEY1, p_v1));
ASSERT_OK(Put(KEY2, p_v2));
ASSERT_OK(Put(KEY3, p_v3));
ASSERT_OK(Put(KEY4, p_v4));
ASSERT_EQ(Get(KEY1), p_v1);
ASSERT_EQ(Get(KEY2), p_v2);
ASSERT_EQ(Get(KEY3), p_v3);
ASSERT_EQ(Get(KEY4), p_v4);
for (size_t i = 0; i < NUM_REPEAT; i++) {
p_v5 = rnd.RandomString(RAND_VALUES_LENGTH);
p_v6 = rnd.RandomString(RAND_VALUES_LENGTH);
p_v7 = rnd.RandomString(RAND_VALUES_LENGTH);
p_v8 = rnd.RandomString(RAND_VALUES_LENGTH);
p_v9 = rnd.RandomString(RAND_VALUES_LENGTH);
ASSERT_OK(Put(KEY5, p_v5));
ASSERT_OK(Put(KEY6, p_v6));
ASSERT_OK(Put(KEY7, p_v7));
ASSERT_OK(Put(KEY8, p_v8));
ASSERT_OK(Put(KEY9, p_v9));
ASSERT_EQ(Get(KEY1), p_v1);
ASSERT_EQ(Get(KEY2), p_v2);
ASSERT_EQ(Get(KEY3), p_v3);
ASSERT_EQ(Get(KEY4), p_v4);
ASSERT_EQ(Get(KEY5), p_v5);
ASSERT_EQ(Get(KEY6), p_v6);
ASSERT_EQ(Get(KEY7), p_v7);
ASSERT_EQ(Get(KEY8), p_v8);
ASSERT_EQ(Get(KEY9), p_v9);
}
const uint32_t EXPECTED_MIN_MEMPURGE_COUNT = 1;
const uint32_t EXPECTED_SST_COUNT = 0;
EXPECT_GE(mempurge_count.exchange(0), EXPECTED_MIN_MEMPURGE_COUNT);
EXPECT_EQ(sst_count.exchange(0), EXPECTED_SST_COUNT);
for (size_t i = 0; i < NUM_REPEAT; i++) {
RNDKEY1 = rnd.RandomString(RAND_KEYS_LENGTH);
RNDKEY2 = rnd.RandomString(RAND_KEYS_LENGTH);
RNDKEY3 = rnd.RandomString(RAND_KEYS_LENGTH);
p_rv1 = rnd.RandomString(RAND_VALUES_LENGTH);
p_rv2 = rnd.RandomString(RAND_VALUES_LENGTH);
p_rv3 = rnd.RandomString(RAND_VALUES_LENGTH);
ASSERT_OK(Put(RNDKEY1, p_rv1));
ASSERT_OK(Put(RNDKEY2, p_rv2));
ASSERT_OK(Put(RNDKEY3, p_rv3));
ASSERT_EQ(Get(KEY1), p_v1);
ASSERT_EQ(Get(KEY2), p_v2);
ASSERT_EQ(Get(KEY3), p_v3);
ASSERT_EQ(Get(KEY4), p_v4);
ASSERT_EQ(Get(KEY5), p_v5);
ASSERT_EQ(Get(KEY6), p_v6);
ASSERT_EQ(Get(KEY7), p_v7);
ASSERT_EQ(Get(KEY8), p_v8);
ASSERT_EQ(Get(KEY9), p_v9);
ASSERT_EQ(Get(RNDKEY1), p_rv1);
ASSERT_EQ(Get(RNDKEY2), p_rv2);
ASSERT_EQ(Get(RNDKEY3), p_rv3);
}
EXPECT_GT(sst_count.exchange(0), EXPECTED_SST_COUNT);
EXPECT_GE(mempurge_count.exchange(0), EXPECTED_MIN_MEMPURGE_COUNT);
ASSERT_EQ(Get(KEY1), p_v1);
ASSERT_EQ(Get(KEY2), p_v2);
ASSERT_EQ(Get(KEY3), p_v3);
ASSERT_EQ(Get(KEY4), p_v4);
ASSERT_EQ(Get(KEY5), p_v5);
ASSERT_EQ(Get(KEY6), p_v6);
ASSERT_EQ(Get(KEY7), p_v7);
ASSERT_EQ(Get(KEY8), p_v8);
ASSERT_EQ(Get(KEY9), p_v9);
ASSERT_EQ(Get(RNDKEY1), p_rv1);
ASSERT_EQ(Get(RNDKEY2), p_rv2);
ASSERT_EQ(Get(RNDKEY3), p_rv3);
Close();
}
TEST_F(DBFlushTest, MemPurgeBasicToggle) {
Options options = CurrentOptions();
options.statistics = CreateDBStatistics();
options.statistics->set_stats_level(StatsLevel::kAll);
options.create_if_missing = true;
options.compression = kNoCompression;
options.inplace_update_support = false;
options.allow_concurrent_memtable_write = true;
options.write_buffer_size = 1 << 20;
options.experimental_mempurge_threshold = -25.3;
TestFlushListener* listener = new TestFlushListener(options.env, this);
options.listeners.emplace_back(listener);
ASSERT_OK(TryReopen(options));
ColumnFamilyHandle* cfh = db_->DefaultColumnFamily();
ASSERT_OK(
db_->SetOptions(cfh, {{"experimental_mempurge_threshold", "3.7898"}}));
std::atomic<uint32_t> mempurge_count{0};
std::atomic<uint32_t> sst_count{0};
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::FlushJob:MemPurgeSuccessful",
[&](void* ) { mempurge_count++; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::FlushJob:SSTFileCreated", [&](void* ) { sst_count++; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
const size_t KVSIZE = 3;
std::vector<std::string> KEYS(KVSIZE);
for (size_t k = 0; k < KVSIZE; k++) {
KEYS[k] = "IamKey" + std::to_string(k);
}
std::vector<std::string> RNDVALS(KVSIZE);
const std::string NOT_FOUND = "NOT_FOUND";
Random rnd(719);
const size_t NUM_REPEAT = 100;
const size_t RAND_VALUES_LENGTH = 10240;
for (size_t i = 0; i < NUM_REPEAT; i++) {
for (size_t j = 0; j < KEYS.size(); j++) {
RNDVALS[j] = rnd.RandomString(RAND_VALUES_LENGTH);
ASSERT_OK(Put(KEYS[j], RNDVALS[j]));
ASSERT_EQ(Get(KEYS[j]), RNDVALS[j]);
}
for (size_t j = 0; j < KEYS.size(); j++) {
ASSERT_EQ(Get(KEYS[j]), RNDVALS[j]);
}
}
const uint32_t EXPECTED_MIN_MEMPURGE_COUNT = 1;
const uint32_t EXPECTED_SST_COUNT = 0;
EXPECT_GE(mempurge_count.exchange(0), EXPECTED_MIN_MEMPURGE_COUNT);
EXPECT_EQ(sst_count.exchange(0), EXPECTED_SST_COUNT);
ASSERT_OK(
db_->SetOptions(cfh, {{"experimental_mempurge_threshold", "-1023.0"}}));
for (size_t i = 0; i < NUM_REPEAT; i++) {
for (size_t j = 0; j < KEYS.size(); j++) {
RNDVALS[j] = rnd.RandomString(RAND_VALUES_LENGTH);
ASSERT_OK(Put(KEYS[j], RNDVALS[j]));
ASSERT_EQ(Get(KEYS[j]), RNDVALS[j]);
}
for (size_t j = 0; j < KEYS.size(); j++) {
ASSERT_EQ(Get(KEYS[j]), RNDVALS[j]);
}
}
const uint32_t ZERO = 0;
EXPECT_GT(sst_count.exchange(0), EXPECTED_SST_COUNT);
EXPECT_EQ(mempurge_count.exchange(0), ZERO);
Close();
}
TEST_F(DBFlushTest, MemPurgeWithAtomicFlush) {
Options options = CurrentOptions();
options.statistics = CreateDBStatistics();
options.statistics->set_stats_level(StatsLevel::kAll);
options.create_if_missing = true;
options.compression = kNoCompression;
options.inplace_update_support = false;
options.allow_concurrent_memtable_write = true;
options.write_buffer_size = 1 << 20;
options.experimental_mempurge_threshold = 153.245;
options.atomic_flush = true;
const std::vector<std::string> new_cf_names = {"pikachu", "eevie"};
CreateColumnFamilies(new_cf_names, options);
Close();
ReopenWithColumnFamilies(
{kDefaultColumnFamilyName, new_cf_names[0], new_cf_names[1]}, options);
size_t num_cfs = handles_.size();
ASSERT_EQ(3, num_cfs);
ASSERT_OK(Put(1, "foo", "bar"));
ASSERT_OK(Put(2, "bar", "baz"));
std::atomic<uint32_t> mempurge_count{0};
std::atomic<uint32_t> sst_count{0};
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::FlushJob:MemPurgeSuccessful",
[&](void* ) { mempurge_count++; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::FlushJob:SSTFileCreated", [&](void* ) { sst_count++; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
const size_t KVSIZE = 3;
std::vector<std::string> KEYS(KVSIZE);
for (size_t k = 0; k < KVSIZE; k++) {
KEYS[k] = "IamKey" + std::to_string(k);
}
std::string RNDKEY;
std::vector<std::string> RNDVALS(KVSIZE);
const std::string NOT_FOUND = "NOT_FOUND";
Random rnd(106);
const size_t NUM_REPEAT = 100;
const size_t RAND_KEY_LENGTH = 128;
const size_t RAND_VALUES_LENGTH = 10240;
for (size_t i = 0; i < NUM_REPEAT; i++) {
for (size_t j = 0; j < KEYS.size(); j++) {
RNDKEY = rnd.RandomString(RAND_KEY_LENGTH);
RNDVALS[j] = rnd.RandomString(RAND_VALUES_LENGTH);
ASSERT_OK(Put(KEYS[j], RNDVALS[j]));
ASSERT_OK(Put(1, RNDKEY, RNDVALS[j]));
ASSERT_EQ(Get(KEYS[j]), RNDVALS[j]);
ASSERT_EQ(Get(1, RNDKEY), RNDVALS[j]);
}
}
const uint32_t EXPECTED_MIN_MEMPURGE_COUNT = 0;
const uint32_t EXPECTED_SST_COUNT = 1;
EXPECT_EQ(mempurge_count.exchange(0), EXPECTED_MIN_MEMPURGE_COUNT);
EXPECT_GE(sst_count.exchange(0), EXPECTED_SST_COUNT);
Close();
}
TEST_F(DBFlushTest, MemPurgeDeleteAndDeleteRange) {
Options options = CurrentOptions();
options.statistics = CreateDBStatistics();
options.statistics->set_stats_level(StatsLevel::kAll);
options.create_if_missing = true;
options.compression = kNoCompression;
options.inplace_update_support = false;
options.allow_concurrent_memtable_write = true;
TestFlushListener* listener = new TestFlushListener(options.env, this);
options.listeners.emplace_back(listener);
options.write_buffer_size = 1 << 20;
options.experimental_mempurge_threshold = 15.0;
ASSERT_OK(TryReopen(options));
std::atomic<uint32_t> mempurge_count{0};
std::atomic<uint32_t> sst_count{0};
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::FlushJob:MemPurgeSuccessful",
[&](void* ) { mempurge_count++; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::FlushJob:SSTFileCreated", [&](void* ) { sst_count++; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
std::string KEY1 = "ThisIsKey1";
std::string KEY2 = "ThisIsKey2";
std::string KEY3 = "ThisIsKey3";
std::string KEY4 = "ThisIsKey4";
std::string KEY5 = "ThisIsKey5";
const std::string NOT_FOUND = "NOT_FOUND";
Random rnd(117);
const size_t NUM_REPEAT = 100;
const size_t RAND_VALUES_LENGTH = 10240;
std::string key, value, p_v1, p_v2, p_v3, p_v3b, p_v4, p_v5;
int count = 0;
const int EXPECTED_COUNT_FORLOOP = 3;
const int EXPECTED_COUNT_END = 4;
ReadOptions ropt;
ropt.pin_data = true;
ropt.total_order_seek = true;
Iterator* iter = nullptr;
for (size_t i = 0; i < NUM_REPEAT; i++) {
p_v1 = rnd.RandomString(RAND_VALUES_LENGTH);
p_v2 = rnd.RandomString(RAND_VALUES_LENGTH);
p_v3 = rnd.RandomString(RAND_VALUES_LENGTH);
p_v3b = rnd.RandomString(RAND_VALUES_LENGTH);
p_v4 = rnd.RandomString(RAND_VALUES_LENGTH);
p_v5 = rnd.RandomString(RAND_VALUES_LENGTH);
ASSERT_OK(Put(KEY1, p_v1));
ASSERT_OK(Put(KEY2, p_v2));
ASSERT_OK(Put(KEY3, p_v3));
ASSERT_OK(Put(KEY4, p_v4));
ASSERT_OK(Put(KEY5, p_v5));
ASSERT_OK(Delete(KEY2));
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), KEY2,
KEY4));
ASSERT_OK(Put(KEY3, p_v3b));
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), KEY1,
KEY3));
ASSERT_OK(Delete(KEY1));
ASSERT_EQ(Get(KEY1), NOT_FOUND);
ASSERT_EQ(Get(KEY2), NOT_FOUND);
ASSERT_EQ(Get(KEY3), p_v3b);
ASSERT_EQ(Get(KEY4), p_v4);
ASSERT_EQ(Get(KEY5), p_v5);
iter = db_->NewIterator(ropt);
iter->SeekToFirst();
count = 0;
for (; iter->Valid(); iter->Next()) {
ASSERT_OK(iter->status());
key = (iter->key()).ToString(false);
value = (iter->value()).ToString(false);
if (key.compare(KEY3) == 0) {
ASSERT_EQ(value, p_v3b);
} else if (key.compare(KEY4) == 0) {
ASSERT_EQ(value, p_v4);
} else if (key.compare(KEY5) == 0) {
ASSERT_EQ(value, p_v5);
} else {
ASSERT_EQ(value, NOT_FOUND);
}
count++;
}
ASSERT_OK(iter->status());
ASSERT_EQ(count, EXPECTED_COUNT_FORLOOP);
if (iter) {
delete iter;
}
}
const uint32_t EXPECTED_MIN_MEMPURGE_COUNT = 1;
const uint32_t EXPECTED_SST_COUNT = 0;
EXPECT_GE(mempurge_count.exchange(0), EXPECTED_MIN_MEMPURGE_COUNT);
EXPECT_EQ(sst_count.exchange(0), EXPECTED_SST_COUNT);
ASSERT_OK(Put(KEY2, p_v2));
iter = db_->NewIterator(ropt);
iter->SeekToFirst();
ASSERT_OK(Put(KEY4, p_v4));
count = 0;
for (; iter->Valid(); iter->Next()) {
ASSERT_OK(iter->status());
key = (iter->key()).ToString(false);
value = (iter->value()).ToString(false);
if (key.compare(KEY2) == 0) {
ASSERT_EQ(value, p_v2);
} else if (key.compare(KEY3) == 0) {
ASSERT_EQ(value, p_v3b);
} else if (key.compare(KEY4) == 0) {
ASSERT_EQ(value, p_v4);
} else if (key.compare(KEY5) == 0) {
ASSERT_EQ(value, p_v5);
} else {
ASSERT_EQ(value, NOT_FOUND);
}
count++;
}
ASSERT_EQ(count, EXPECTED_COUNT_END);
if (iter) {
delete iter;
}
Close();
}
class ConditionalUpdateFilter : public CompactionFilter {
public:
explicit ConditionalUpdateFilter(const std::string* filtered_key)
: filtered_key_(filtered_key) {}
bool Filter(int , const Slice& key, const Slice& ,
std::string* new_value, bool* value_changed) const override {
if (key.compare(*filtered_key_) < 0) {
assert(new_value != nullptr);
*new_value = NEW_VALUE;
*value_changed = true;
}
return false ;
}
const char* Name() const override { return "ConditionalUpdateFilter"; }
private:
const std::string* filtered_key_;
};
class ConditionalUpdateFilterFactory : public CompactionFilterFactory {
public:
explicit ConditionalUpdateFilterFactory(const Slice& filtered_key)
: filtered_key_(filtered_key.ToString()) {}
std::unique_ptr<CompactionFilter> CreateCompactionFilter(
const CompactionFilter::Context& ) override {
return std::unique_ptr<CompactionFilter>(
new ConditionalUpdateFilter(&filtered_key_));
}
const char* Name() const override { return "ConditionalUpdateFilterFactory"; }
bool ShouldFilterTableFileCreation(
TableFileCreationReason reason) const override {
return (reason == TableFileCreationReason::kFlush);
}
private:
std::string filtered_key_;
};
TEST_F(DBFlushTest, MemPurgeAndCompactionFilter) {
Options options = CurrentOptions();
std::string KEY1 = "ThisIsKey1";
std::string KEY2 = "ThisIsKey2";
std::string KEY3 = "ThisIsKey3";
std::string KEY4 = "ThisIsKey4";
std::string KEY5 = "ThisIsKey5";
std::string KEY6 = "ThisIsKey6";
std::string KEY7 = "ThisIsKey7";
std::string KEY8 = "ThisIsKey8";
std::string KEY9 = "ThisIsKey9";
const std::string NOT_FOUND = "NOT_FOUND";
options.statistics = CreateDBStatistics();
options.statistics->set_stats_level(StatsLevel::kAll);
options.create_if_missing = true;
options.compression = kNoCompression;
options.inplace_update_support = false;
options.allow_concurrent_memtable_write = true;
TestFlushListener* listener = new TestFlushListener(options.env, this);
options.listeners.emplace_back(listener);
options.compaction_filter_factory =
std::make_shared<ConditionalUpdateFilterFactory>(KEY4);
options.write_buffer_size = 1 << 20;
options.experimental_mempurge_threshold = 26.55;
ASSERT_OK(TryReopen(options));
std::atomic<uint32_t> mempurge_count{0};
std::atomic<uint32_t> sst_count{0};
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::FlushJob:MemPurgeSuccessful",
[&](void* ) { mempurge_count++; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::FlushJob:SSTFileCreated", [&](void* ) { sst_count++; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
Random rnd(53);
const size_t NUM_REPEAT = 1000;
const size_t RAND_VALUES_LENGTH = 10240;
std::string p_v1, p_v2, p_v3, p_v4, p_v5, p_v6, p_v7, p_v8, p_v9;
p_v1 = rnd.RandomString(RAND_VALUES_LENGTH);
p_v2 = rnd.RandomString(RAND_VALUES_LENGTH);
p_v3 = rnd.RandomString(RAND_VALUES_LENGTH);
p_v4 = rnd.RandomString(RAND_VALUES_LENGTH);
p_v5 = rnd.RandomString(RAND_VALUES_LENGTH);
ASSERT_OK(Put(KEY1, p_v1));
ASSERT_OK(Put(KEY2, p_v2));
ASSERT_OK(Put(KEY3, p_v3));
ASSERT_OK(Put(KEY4, p_v4));
ASSERT_OK(Put(KEY5, p_v5));
ASSERT_OK(Delete(KEY1));
for (size_t i = 0; i < NUM_REPEAT; i++) {
p_v6 = rnd.RandomString(RAND_VALUES_LENGTH);
p_v7 = rnd.RandomString(RAND_VALUES_LENGTH);
p_v8 = rnd.RandomString(RAND_VALUES_LENGTH);
p_v9 = rnd.RandomString(RAND_VALUES_LENGTH);
ASSERT_OK(Put(KEY6, p_v6));
ASSERT_OK(Put(KEY7, p_v7));
ASSERT_OK(Put(KEY8, p_v8));
ASSERT_OK(Put(KEY9, p_v9));
ASSERT_OK(Delete(KEY7));
}
const uint32_t EXPECTED_MIN_MEMPURGE_COUNT = 1;
const uint32_t EXPECTED_SST_COUNT = 0;
EXPECT_GE(mempurge_count.exchange(0), EXPECTED_MIN_MEMPURGE_COUNT);
EXPECT_EQ(sst_count.exchange(0), EXPECTED_SST_COUNT);
ASSERT_EQ(Get(KEY1), NOT_FOUND);
ASSERT_EQ(Get(KEY2), NEW_VALUE);
ASSERT_EQ(Get(KEY3), NEW_VALUE);
ASSERT_EQ(Get(KEY4), p_v4);
ASSERT_EQ(Get(KEY5), p_v5);
}
TEST_F(DBFlushTest, DISABLED_MemPurgeWALSupport) {
Options options = CurrentOptions();
options.statistics = CreateDBStatistics();
options.statistics->set_stats_level(StatsLevel::kAll);
options.create_if_missing = true;
options.compression = kNoCompression;
options.inplace_update_support = false;
options.allow_concurrent_memtable_write = true;
options.write_buffer_size = 128 << 10;
options.experimental_mempurge_threshold = 2.5;
ASSERT_OK(TryReopen(options));
const size_t KVSIZE = 10;
do {
CreateAndReopenWithCF({"pikachu"}, options);
ASSERT_OK(Put(1, "foo", "v1"));
ASSERT_OK(Put(1, "baz", "v5"));
ReopenWithColumnFamilies({"default", "pikachu"}, options);
ASSERT_EQ("v1", Get(1, "foo"));
ASSERT_EQ("v1", Get(1, "foo"));
ASSERT_EQ("v5", Get(1, "baz"));
ASSERT_OK(Put(0, "bar", "v2"));
ASSERT_OK(Put(1, "bar", "v2"));
ASSERT_OK(Put(1, "foo", "v3"));
std::atomic<uint32_t> mempurge_count{0};
std::atomic<uint32_t> sst_count{0};
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::FlushJob:MemPurgeSuccessful",
[&](void* ) { mempurge_count++; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::FlushJob:SSTFileCreated", [&](void* ) { sst_count++; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
std::vector<std::string> keys;
for (size_t k = 0; k < KVSIZE; k++) {
keys.push_back("IamKey" + std::to_string(k));
}
std::string RNDKEY, RNDVALUE;
const std::string NOT_FOUND = "NOT_FOUND";
Random rnd(719);
const size_t NUM_REPEAT = 100;
const size_t RAND_KEY_LENGTH = 4096;
const size_t RAND_VALUES_LENGTH = 1024;
std::vector<std::string> values_default(KVSIZE), values_pikachu(KVSIZE);
for (size_t k = 0; k < KVSIZE / 2; k++) {
values_default[k] = rnd.RandomString(RAND_VALUES_LENGTH);
values_pikachu[k] = rnd.RandomString(RAND_VALUES_LENGTH);
}
for (size_t k = 0; k < KVSIZE / 2; k++) {
ASSERT_OK(Put(0, keys[k], values_default[k]));
ASSERT_OK(Put(1, keys[k], values_pikachu[k]));
}
for (size_t k = 0; k < KVSIZE / 2; k++) {
ASSERT_EQ(Get(0, keys[k]), values_default[k]);
ASSERT_EQ(Get(1, keys[k]), values_pikachu[k]);
}
for (size_t j = 0; j < NUM_REPEAT; j++) {
for (size_t k = KVSIZE / 2; k < KVSIZE; k++) {
values_default[k] = rnd.RandomString(RAND_VALUES_LENGTH);
}
for (size_t k = KVSIZE / 2; k < KVSIZE; k++) {
ASSERT_OK(Put(0, keys[k], values_default[k]));
}
for (size_t k = 0; k < KVSIZE; k++) {
ASSERT_EQ(Get(0, keys[k]), values_default[k]);
}
for (size_t k = 0; k < KVSIZE / 2; k++) {
ASSERT_EQ(Get(1, keys[k]), values_pikachu[k]);
}
}
for (size_t j = 0; j < NUM_REPEAT; j++) {
for (size_t k = KVSIZE / 2; k < KVSIZE; k++) {
values_pikachu[k] = rnd.RandomString(RAND_VALUES_LENGTH);
}
for (size_t k = KVSIZE / 2; k < KVSIZE; k++) {
ASSERT_OK(Put(1, keys[k], values_pikachu[k]));
}
for (size_t k = 0; k < KVSIZE; k++) {
ASSERT_EQ(Get(0, keys[k]), values_default[k]);
ASSERT_EQ(Get(1, keys[k]), values_pikachu[k]);
}
}
const uint32_t EXPECTED_MIN_MEMPURGE_COUNT = 1;
const uint32_t EXPECTED_SST_COUNT = 0;
EXPECT_GE(mempurge_count.exchange(0), EXPECTED_MIN_MEMPURGE_COUNT);
if (options.experimental_mempurge_threshold ==
std::numeric_limits<double>::max()) {
EXPECT_EQ(sst_count.exchange(0), EXPECTED_SST_COUNT);
}
ReopenWithColumnFamilies({"default", "pikachu"}, options);
ASSERT_EQ("v3", Get(1, "foo"));
ASSERT_OK(Put(1, "foo", "v4"));
ASSERT_EQ("v4", Get(1, "foo"));
ASSERT_EQ("v2", Get(1, "bar"));
ASSERT_EQ("v5", Get(1, "baz"));
for (size_t k = 0; k < KVSIZE; k++) {
ASSERT_EQ(Get(0, keys[k]), values_default[k]);
ASSERT_EQ(Get(1, keys[k]), values_pikachu[k]);
}
for (size_t j = 0; j < NUM_REPEAT; j++) {
RNDKEY = rnd.RandomString(RAND_KEY_LENGTH);
RNDVALUE = rnd.RandomString(RAND_VALUES_LENGTH);
ASSERT_OK(Put(1, RNDKEY, RNDVALUE));
}
EXPECT_GT(sst_count.exchange(0), EXPECTED_SST_COUNT);
ReopenWithColumnFamilies({"default", "pikachu"}, options);
ASSERT_EQ("v4", Get(1, "foo"));
ASSERT_EQ("v2", Get(1, "bar"));
ASSERT_EQ("v5", Get(1, "baz"));
for (size_t k = 0; k < KVSIZE; k++) {
ASSERT_EQ(Get(0, keys[k]), values_default[k]);
ASSERT_EQ(Get(1, keys[k]), values_pikachu[k]);
}
ASSERT_EQ(Get(1, RNDKEY), RNDVALUE);
} while (ChangeWalOptions());
}
TEST_F(DBFlushTest, MemPurgeCorrectLogNumberAndSSTFileCreation) {
Options options = CurrentOptions();
options.create_if_missing = true;
options.compression = kNoCompression;
options.inplace_update_support = false;
options.allow_concurrent_memtable_write = true;
options.write_buffer_size = 1 << 20;
options.experimental_mempurge_threshold = 1.0;
options.min_write_buffer_number_to_merge = 3;
options.max_write_buffer_number = 5;
options.max_write_buffer_size_to_maintain = 2 * (options.write_buffer_size);
options.disable_auto_compactions = true;
ASSERT_OK(TryReopen(options));
std::atomic<uint32_t> mempurge_count{0};
std::atomic<uint32_t> sst_count{0};
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::FlushJob:MemPurgeSuccessful",
[&](void* ) { mempurge_count++; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::FlushJob:SSTFileCreated", [&](void* ) { sst_count++; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
uint64_t ZERO = 0;
std::atomic<uint64_t> num_memtable_at_first_flush(0);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"FlushJob::WriteLevel0Table:num_memtables", [&](void* arg) {
uint64_t* mems_size = static_cast<uint64_t*>(arg);
ZERO = 0;
std::atomic_compare_exchange_strong(&num_memtable_at_first_flush, &ZERO,
*mems_size);
});
const std::vector<std::string> KEYS = {
"ThisIsKey1", "ThisIsKey2", "ThisIsKey3", "ThisIsKey4", "ThisIsKey5",
"ThisIsKey6", "ThisIsKey7", "ThisIsKey8", "ThisIsKey9"};
const std::string NOT_FOUND = "NOT_FOUND";
Random rnd(117);
const uint64_t NUM_REPEAT_OVERWRITES = 100;
const uint64_t NUM_RAND_INSERTS = 500;
const uint64_t RAND_VALUES_LENGTH = 10240;
std::string key, value;
std::vector<std::string> values(9, "");
for (uint64_t k = 0; k < 5; k++) {
values[k] = rnd.RandomString(RAND_VALUES_LENGTH);
ASSERT_OK(Put(KEYS[k], values[k]));
}
for (size_t i = 0; i < NUM_REPEAT_OVERWRITES; i++) {
for (uint64_t k = 5; k < values.size(); k++) {
values[k] = rnd.RandomString(RAND_VALUES_LENGTH);
ASSERT_OK(Put(KEYS[k], values[k]));
}
for (uint64_t k = 0; k < values.size(); k++) {
ASSERT_EQ(Get(KEYS[k]), values[k]);
}
}
uint32_t expected_min_mempurge_count = 1;
uint32_t expected_sst_count = 0;
EXPECT_GE(mempurge_count.load(), expected_min_mempurge_count);
EXPECT_EQ(sst_count.load(), expected_sst_count);
for (size_t i = 0; i < NUM_RAND_INSERTS; i++) {
key = rnd.RandomString(RAND_VALUES_LENGTH);
value = rnd.RandomString(RAND_VALUES_LENGTH);
ASSERT_OK(Put(key, value));
for (uint64_t k = 0; k < values.size(); k++) {
ASSERT_EQ(Get(KEYS[k]), values[k]);
}
ASSERT_EQ(Get(key), value);
}
expected_sst_count = 1;
EXPECT_GE(sst_count.load(), expected_sst_count);
ASSERT_GE(num_memtable_at_first_flush.load(), 2);
for (uint64_t k = 0; k < values.size(); k++) {
ASSERT_EQ(Get(KEYS[k]), values[k]);
}
ASSERT_EQ(Get(key), value);
Close();
}
TEST_P(DBFlushDirectIOTest, DirectIO) {
Options options;
options.create_if_missing = true;
options.disable_auto_compactions = true;
options.max_background_flushes = 2;
options.use_direct_io_for_flush_and_compaction = GetParam();
options.env = MockEnv::Create(Env::Default());
SyncPoint::GetInstance()->SetCallBack(
"BuildTable:create_file", [&](void* arg) {
bool* use_direct_writes = static_cast<bool*>(arg);
ASSERT_EQ(*use_direct_writes,
options.use_direct_io_for_flush_and_compaction);
});
SyncPoint::GetInstance()->EnableProcessing();
Reopen(options);
ASSERT_OK(Put("foo", "v"));
FlushOptions flush_options;
flush_options.wait = true;
ASSERT_OK(dbfull()->Flush(flush_options));
Destroy(options);
delete options.env;
}
TEST_F(DBFlushTest, FlushError) {
Options options;
std::unique_ptr<FaultInjectionTestEnv> fault_injection_env(
new FaultInjectionTestEnv(env_));
options.write_buffer_size = 100;
options.max_write_buffer_number = 4;
options.min_write_buffer_number_to_merge = 3;
options.disable_auto_compactions = true;
options.env = fault_injection_env.get();
Reopen(options);
ASSERT_OK(Put("key1", "value1"));
ASSERT_OK(Put("key2", "value2"));
fault_injection_env->SetFilesystemActive(false);
Status s = dbfull()->TEST_SwitchMemtable();
fault_injection_env->SetFilesystemActive(true);
Destroy(options);
ASSERT_NE(s, Status::OK());
}
TEST_F(DBFlushTest, ManualFlushFailsInReadOnlyMode) {
Options options;
std::unique_ptr<FaultInjectionTestEnv> fault_injection_env(
new FaultInjectionTestEnv(env_));
options.env = fault_injection_env.get();
options.max_write_buffer_number = 2;
Reopen(options);
ASSERT_OK(db_->PauseBackgroundWork());
ASSERT_OK(Put("key1", "value1"));
FlushOptions flush_opts;
flush_opts.wait = false;
ASSERT_OK(db_->Flush(flush_opts));
ASSERT_OK(Put("key2", "value2"));
fault_injection_env->SetFilesystemActive(false);
ASSERT_OK(db_->ContinueBackgroundWork());
ASSERT_NOK(dbfull()->TEST_WaitForFlushMemTable());
uint64_t num_bg_errors;
ASSERT_TRUE(
db_->GetIntProperty(DB::Properties::kBackgroundErrors, &num_bg_errors));
ASSERT_GT(num_bg_errors, 0);
ASSERT_NOK(db_->Flush(FlushOptions()));
Close();
}
TEST_F(DBFlushTest, CFDropRaceWithWaitForFlushMemTables) {
Options options = CurrentOptions();
options.create_if_missing = true;
CreateAndReopenWithCF({"pikachu"}, options);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::FlushMemTable:AfterScheduleFlush",
"DBFlushTest::CFDropRaceWithWaitForFlushMemTables:BeforeDrop"},
{"DBFlushTest::CFDropRaceWithWaitForFlushMemTables:AfterFree",
"DBImpl::BackgroundCallFlush:start"},
{"DBImpl::BackgroundCallFlush:start",
"DBImpl::FlushMemTable:BeforeWaitForBgFlush"}});
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_EQ(2, handles_.size());
ASSERT_OK(Put(1, "key", "value"));
auto* cfd = static_cast<ColumnFamilyHandleImpl*>(handles_[1])->cfd();
port::Thread drop_cf_thr([&]() {
TEST_SYNC_POINT(
"DBFlushTest::CFDropRaceWithWaitForFlushMemTables:BeforeDrop");
ASSERT_OK(dbfull()->DropColumnFamily(handles_[1]));
ASSERT_OK(dbfull()->DestroyColumnFamilyHandle(handles_[1]));
handles_.resize(1);
TEST_SYNC_POINT(
"DBFlushTest::CFDropRaceWithWaitForFlushMemTables:AfterFree");
});
FlushOptions flush_opts;
flush_opts.allow_write_stall = true;
ASSERT_NOK(dbfull()->TEST_FlushMemTable(cfd, flush_opts));
drop_cf_thr.join();
Close();
SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(DBFlushTest, FireOnFlushCompletedAfterCommittedResult) {
class TestListener : public EventListener {
public:
void OnFlushCompleted(DB* db, const FlushJobInfo& info) override {
ASSERT_EQ(info.smallest_seqno, info.largest_seqno);
ASSERT_NE(0, info.smallest_seqno);
if (info.smallest_seqno == seq1) {
ASSERT_FALSE(completed1);
completed1 = true;
CheckFlushResultCommitted(db, seq1);
} else {
ASSERT_FALSE(completed2);
completed2 = true;
ASSERT_EQ(info.smallest_seqno, seq2);
CheckFlushResultCommitted(db, seq2);
}
}
void CheckFlushResultCommitted(DB* db, SequenceNumber seq) {
DBImpl* db_impl = static_cast_with_check<DBImpl>(db);
InstrumentedMutex* mutex = db_impl->mutex();
mutex->Lock();
auto* cfd = static_cast_with_check<ColumnFamilyHandleImpl>(
db->DefaultColumnFamily())
->cfd();
ASSERT_LT(seq, cfd->imm()->current()->GetEarliestSequenceNumber());
mutex->Unlock();
}
std::atomic<SequenceNumber> seq1{0};
std::atomic<SequenceNumber> seq2{0};
std::atomic<bool> completed1{false};
std::atomic<bool> completed2{false};
};
std::shared_ptr<TestListener> listener = std::make_shared<TestListener>();
SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::FlushMemTableToOutputFile:AfterPickMemtables",
"DBFlushTest::FireOnFlushCompletedAfterCommittedResult:WaitFirst"},
{"DBImpl::FlushMemTableToOutputFile:Finish",
"DBFlushTest::FireOnFlushCompletedAfterCommittedResult:WaitSecond"}});
SyncPoint::GetInstance()->SetCallBack(
"FlushJob::WriteLevel0Table", [&listener](void* arg) {
auto* mems = static_cast<autovector<MemTable*>*>(arg);
if (mems->front()->GetEarliestSequenceNumber() == listener->seq1 - 1) {
TEST_SYNC_POINT(
"DBFlushTest::FireOnFlushCompletedAfterCommittedResult:"
"WaitSecond");
}
});
Options options = CurrentOptions();
options.create_if_missing = true;
options.listeners.push_back(listener);
options.max_background_jobs = 8;
options.max_write_buffer_number = 3;
Reopen(options);
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(Put("foo", "v"));
listener->seq1 = db_->GetLatestSequenceNumber();
auto t1 = port::Thread([&]() {
ASSERT_OK(db_->Flush(FlushOptions()));
});
TEST_SYNC_POINT(
"DBFlushTest::FireOnFlushCompletedAfterCommittedResult:WaitFirst");
ASSERT_OK(Put("bar", "v"));
listener->seq2 = db_->GetLatestSequenceNumber();
FlushOptions flush_opts;
flush_opts.wait = false;
ASSERT_OK(db_->Flush(flush_opts));
t1.join();
ASSERT_OK(dbfull()->TEST_WaitForBackgroundWork());
ASSERT_TRUE(listener->completed1);
ASSERT_TRUE(listener->completed2);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}
TEST_F(DBFlushTest, FlushWithBlob) {
constexpr uint64_t min_blob_size = 10;
Options options;
options.enable_blob_files = true;
options.min_blob_size = min_blob_size;
options.disable_auto_compactions = true;
options.env = env_;
Reopen(options);
constexpr char short_value[] = "short";
static_assert(sizeof(short_value) - 1 < min_blob_size,
"short_value too long");
constexpr char long_value[] = "long_value";
static_assert(sizeof(long_value) - 1 >= min_blob_size,
"long_value too short");
ASSERT_OK(Put("key1", short_value));
ASSERT_OK(Put("key2", long_value));
ASSERT_OK(Flush());
ASSERT_EQ(Get("key1"), short_value);
ASSERT_EQ(Get("key2"), long_value);
VersionSet* const versions = dbfull()->GetVersionSet();
assert(versions);
ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
assert(cfd);
Version* const current = cfd->current();
assert(current);
const VersionStorageInfo* const storage_info = current->storage_info();
assert(storage_info);
const auto& l0_files = storage_info->LevelFiles(0);
ASSERT_EQ(l0_files.size(), 1);
const FileMetaData* const table_file = l0_files[0];
assert(table_file);
const auto& blob_files = storage_info->GetBlobFiles();
ASSERT_EQ(blob_files.size(), 1);
const auto& blob_file = blob_files.front();
assert(blob_file);
ASSERT_EQ(table_file->smallest.user_key(), "key1");
ASSERT_EQ(table_file->largest.user_key(), "key2");
ASSERT_EQ(table_file->fd.smallest_seqno, 1);
ASSERT_EQ(table_file->fd.largest_seqno, 2);
ASSERT_EQ(table_file->oldest_blob_file_number,
blob_file->GetBlobFileNumber());
ASSERT_EQ(blob_file->GetTotalBlobCount(), 1);
const InternalStats* const internal_stats = cfd->internal_stats();
assert(internal_stats);
const auto& compaction_stats = internal_stats->TEST_GetCompactionStats();
ASSERT_FALSE(compaction_stats.empty());
ASSERT_EQ(compaction_stats[0].bytes_written, table_file->fd.GetFileSize());
ASSERT_EQ(compaction_stats[0].bytes_written_blob,
blob_file->GetTotalBlobBytes());
ASSERT_EQ(compaction_stats[0].num_output_files, 1);
ASSERT_EQ(compaction_stats[0].num_output_files_blob, 1);
const uint64_t* const cf_stats_value = internal_stats->TEST_GetCFStatsValue();
ASSERT_EQ(cf_stats_value[InternalStats::BYTES_FLUSHED],
compaction_stats[0].bytes_written +
compaction_stats[0].bytes_written_blob);
}
TEST_F(DBFlushTest, FlushWithChecksumHandoff1) {
if (mem_env_ || encrypted_env_) {
ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment");
return;
}
std::shared_ptr<FaultInjectionTestFS> fault_fs(
new FaultInjectionTestFS(FileSystem::Default()));
std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
Options options = CurrentOptions();
options.write_buffer_size = 100;
options.max_write_buffer_number = 4;
options.min_write_buffer_number_to_merge = 3;
options.disable_auto_compactions = true;
options.env = fault_fs_env.get();
options.checksum_handoff_file_types.Add(FileType::kTableFile);
Reopen(options);
fault_fs->SetChecksumHandoffFuncType(ChecksumType::kCRC32c);
ASSERT_OK(Put("key1", "value1"));
ASSERT_OK(Put("key2", "value2"));
ASSERT_OK(dbfull()->TEST_SwitchMemtable());
SyncPoint::GetInstance()->SetCallBack("FlushJob::Start", [&](void*) {
fault_fs->SetChecksumHandoffFuncType(ChecksumType::kxxHash);
});
ASSERT_OK(Put("key3", "value3"));
ASSERT_OK(Put("key4", "value4"));
SyncPoint::GetInstance()->EnableProcessing();
Status s = Flush();
ASSERT_EQ(s.severity(),
ROCKSDB_NAMESPACE::Status::Severity::kUnrecoverableError);
SyncPoint::GetInstance()->DisableProcessing();
Destroy(options);
Reopen(options);
fault_fs->SetChecksumHandoffFuncType(ChecksumType::kNoChecksum);
ASSERT_OK(Put("key5", "value5"));
ASSERT_OK(Put("key6", "value6"));
ASSERT_OK(dbfull()->TEST_SwitchMemtable());
fault_fs->SetChecksumHandoffFuncType(ChecksumType::kCRC32c);
SyncPoint::GetInstance()->SetCallBack("FlushJob::Start", [&](void*) {
fault_fs->IngestDataCorruptionBeforeWrite();
});
ASSERT_OK(Put("key7", "value7"));
ASSERT_OK(Put("key8", "value8"));
SyncPoint::GetInstance()->EnableProcessing();
s = Flush();
ASSERT_EQ(s.severity(),
ROCKSDB_NAMESPACE::Status::Severity::kUnrecoverableError);
SyncPoint::GetInstance()->DisableProcessing();
Destroy(options);
}
TEST_F(DBFlushTest, FlushWithChecksumHandoff2) {
if (mem_env_ || encrypted_env_) {
ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment");
return;
}
std::shared_ptr<FaultInjectionTestFS> fault_fs(
new FaultInjectionTestFS(FileSystem::Default()));
std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
Options options = CurrentOptions();
options.write_buffer_size = 100;
options.max_write_buffer_number = 4;
options.min_write_buffer_number_to_merge = 3;
options.disable_auto_compactions = true;
options.env = fault_fs_env.get();
Reopen(options);
fault_fs->SetChecksumHandoffFuncType(ChecksumType::kCRC32c);
ASSERT_OK(Put("key1", "value1"));
ASSERT_OK(Put("key2", "value2"));
ASSERT_OK(Flush());
SyncPoint::GetInstance()->SetCallBack("FlushJob::Start", [&](void*) {
fault_fs->SetChecksumHandoffFuncType(ChecksumType::kxxHash);
});
ASSERT_OK(Put("key3", "value3"));
ASSERT_OK(Put("key4", "value4"));
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(Flush());
SyncPoint::GetInstance()->DisableProcessing();
Destroy(options);
Reopen(options);
fault_fs->SetChecksumHandoffFuncType(ChecksumType::kNoChecksum);
ASSERT_OK(Put("key5", "value5"));
ASSERT_OK(Put("key6", "value6"));
ASSERT_OK(Flush());
fault_fs->SetChecksumHandoffFuncType(ChecksumType::kCRC32c);
SyncPoint::GetInstance()->SetCallBack("FlushJob::Start", [&](void*) {
fault_fs->IngestDataCorruptionBeforeWrite();
});
ASSERT_OK(Put("key7", "value7"));
ASSERT_OK(Put("key8", "value8"));
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(Flush());
SyncPoint::GetInstance()->DisableProcessing();
Destroy(options);
}
TEST_F(DBFlushTest, FlushWithChecksumHandoffManifest1) {
if (mem_env_ || encrypted_env_) {
ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment");
return;
}
std::shared_ptr<FaultInjectionTestFS> fault_fs(
new FaultInjectionTestFS(FileSystem::Default()));
std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
Options options = CurrentOptions();
options.write_buffer_size = 100;
options.max_write_buffer_number = 4;
options.min_write_buffer_number_to_merge = 3;
options.disable_auto_compactions = true;
options.env = fault_fs_env.get();
options.checksum_handoff_file_types.Add(FileType::kDescriptorFile);
fault_fs->SetChecksumHandoffFuncType(ChecksumType::kCRC32c);
Reopen(options);
ASSERT_OK(Put("key1", "value1"));
ASSERT_OK(Put("key2", "value2"));
ASSERT_OK(Flush());
ASSERT_OK(Put("key3", "value3"));
SyncPoint::GetInstance()->SetCallBack(
"VersionSet::LogAndApply:WriteManifest", [&](void*) {
fault_fs->SetChecksumHandoffFuncType(ChecksumType::kxxHash);
});
ASSERT_OK(Put("key3", "value3"));
ASSERT_OK(Put("key4", "value4"));
SyncPoint::GetInstance()->EnableProcessing();
Status s = Flush();
ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kFatalError);
SyncPoint::GetInstance()->DisableProcessing();
Destroy(options);
}
TEST_F(DBFlushTest, FlushWithChecksumHandoffManifest2) {
if (mem_env_ || encrypted_env_) {
ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment");
return;
}
std::shared_ptr<FaultInjectionTestFS> fault_fs(
new FaultInjectionTestFS(FileSystem::Default()));
std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
Options options = CurrentOptions();
options.write_buffer_size = 100;
options.max_write_buffer_number = 4;
options.min_write_buffer_number_to_merge = 3;
options.disable_auto_compactions = true;
options.env = fault_fs_env.get();
options.checksum_handoff_file_types.Add(FileType::kDescriptorFile);
fault_fs->SetChecksumHandoffFuncType(ChecksumType::kNoChecksum);
Reopen(options);
ASSERT_OK(Put("key5", "value5"));
ASSERT_OK(Put("key6", "value6"));
ASSERT_OK(Flush());
fault_fs->SetChecksumHandoffFuncType(ChecksumType::kCRC32c);
SyncPoint::GetInstance()->SetCallBack(
"VersionSet::LogAndApply:WriteManifest",
[&](void*) { fault_fs->IngestDataCorruptionBeforeWrite(); });
ASSERT_OK(Put("key7", "value7"));
ASSERT_OK(Put("key8", "value8"));
SyncPoint::GetInstance()->EnableProcessing();
Status s = Flush();
ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kFatalError);
SyncPoint::GetInstance()->DisableProcessing();
Destroy(options);
}
TEST_F(DBFlushTest, PickRightMemtables) {
Options options = CurrentOptions();
DestroyAndReopen(options);
options.create_if_missing = true;
const std::string test_cf_name = "test_cf";
options.max_write_buffer_number = 128;
CreateColumnFamilies({test_cf_name}, options);
Close();
ReopenWithColumnFamilies({kDefaultColumnFamilyName, test_cf_name}, options);
ASSERT_OK(db_->Put(WriteOptions(), "key", "value"));
ASSERT_OK(db_->Put(WriteOptions(), handles_[1], "key", "value"));
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::SyncClosedWals:BeforeReLock", [&](void* ) {
ASSERT_OK(db_->Put(WriteOptions(), handles_[1], "what", "v"));
auto* cfhi =
static_cast_with_check<ColumnFamilyHandleImpl>(handles_[1]);
assert(cfhi);
ASSERT_OK(dbfull()->TEST_SwitchMemtable(cfhi->cfd()));
});
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::FlushMemTableToOutputFile:AfterPickMemtables", [&](void* arg) {
auto* job = static_cast<FlushJob*>(arg);
assert(job);
const auto& mems = job->GetMemTables();
assert(mems.size() == 1);
assert(mems[0]);
ASSERT_EQ(1, mems[0]->GetID());
});
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(db_->Flush(FlushOptions(), handles_[1]));
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}
class DBFlushTestBlobError : public DBFlushTest,
public testing::WithParamInterface<std::string> {
public:
DBFlushTestBlobError() : sync_point_(GetParam()) {}
std::string sync_point_;
};
INSTANTIATE_TEST_CASE_P(DBFlushTestBlobError, DBFlushTestBlobError,
::testing::ValuesIn(std::vector<std::string>{
"BlobFileBuilder::WriteBlobToFile:AddRecord",
"BlobFileBuilder::WriteBlobToFile:AppendFooter"}));
TEST_P(DBFlushTestBlobError, FlushError) {
Options options;
options.enable_blob_files = true;
options.disable_auto_compactions = true;
options.env = env_;
Reopen(options);
ASSERT_OK(Put("key", "blob"));
SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* arg) {
Status* const s = static_cast<Status*>(arg);
assert(s);
(*s) = Status::IOError(sync_point_);
});
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_NOK(Flush());
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
VersionSet* const versions = dbfull()->GetVersionSet();
assert(versions);
ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
assert(cfd);
Version* const current = cfd->current();
assert(current);
const VersionStorageInfo* const storage_info = current->storage_info();
assert(storage_info);
const auto& l0_files = storage_info->LevelFiles(0);
ASSERT_TRUE(l0_files.empty());
const auto& blob_files = storage_info->GetBlobFiles();
ASSERT_TRUE(blob_files.empty());
std::vector<std::string> files;
ASSERT_OK(env_->GetChildren(dbname_, &files));
for (const auto& file : files) {
uint64_t number = 0;
FileType type = kTableFile;
if (!ParseFileName(file, &number, &type)) {
continue;
}
ASSERT_NE(type, kTableFile);
ASSERT_NE(type, kBlobFile);
}
const InternalStats* const internal_stats = cfd->internal_stats();
assert(internal_stats);
const auto& compaction_stats = internal_stats->TEST_GetCompactionStats();
ASSERT_FALSE(compaction_stats.empty());
if (sync_point_ == "BlobFileBuilder::WriteBlobToFile:AddRecord") {
ASSERT_EQ(compaction_stats[0].bytes_written, 0);
ASSERT_EQ(compaction_stats[0].bytes_written_blob, 0);
ASSERT_EQ(compaction_stats[0].num_output_files, 0);
ASSERT_EQ(compaction_stats[0].num_output_files_blob, 0);
} else {
ASSERT_GT(compaction_stats[0].bytes_written, 0);
ASSERT_EQ(compaction_stats[0].bytes_written_blob, 0);
ASSERT_EQ(compaction_stats[0].num_output_files, 1);
ASSERT_EQ(compaction_stats[0].num_output_files_blob, 0);
}
const uint64_t* const cf_stats_value = internal_stats->TEST_GetCFStatsValue();
ASSERT_EQ(cf_stats_value[InternalStats::BYTES_FLUSHED],
compaction_stats[0].bytes_written +
compaction_stats[0].bytes_written_blob);
}
TEST_F(DBFlushTest, TombstoneVisibleInSnapshot) {
class SimpleTestFlushListener : public EventListener {
public:
explicit SimpleTestFlushListener(DBFlushTest* _test) : test_(_test) {}
~SimpleTestFlushListener() override = default;
void OnFlushBegin(DB* db, const FlushJobInfo& info) override {
ASSERT_EQ(static_cast<uint32_t>(0), info.cf_id);
ASSERT_OK(db->Delete(WriteOptions(), "foo"));
snapshot_ = db->GetSnapshot();
ASSERT_OK(db->Put(WriteOptions(), "foo", "value"));
auto* dbimpl = static_cast_with_check<DBImpl>(db);
assert(dbimpl);
ColumnFamilyHandle* cfh = db->DefaultColumnFamily();
auto* cfhi = static_cast_with_check<ColumnFamilyHandleImpl>(cfh);
assert(cfhi);
ASSERT_OK(dbimpl->TEST_SwitchMemtable(cfhi->cfd()));
}
DBFlushTest* test_ = nullptr;
const Snapshot* snapshot_ = nullptr;
};
Options options = CurrentOptions();
options.create_if_missing = true;
auto* listener = new SimpleTestFlushListener(this);
options.listeners.emplace_back(listener);
DestroyAndReopen(options);
ASSERT_OK(db_->Put(WriteOptions(), "foo", "value0"));
ManagedSnapshot snapshot_guard(db_.get());
ColumnFamilyHandle* default_cf = db_->DefaultColumnFamily();
ASSERT_OK(db_->Flush(FlushOptions(), default_cf));
const Snapshot* snapshot = listener->snapshot_;
assert(snapshot);
ReadOptions read_opts;
read_opts.snapshot = snapshot;
{
std::string value;
Status s = db_->Get(read_opts, "foo", &value);
ASSERT_TRUE(s.IsNotFound());
}
db_->ReleaseSnapshot(snapshot);
}
TEST_P(DBAtomicFlushTest, ManualFlushUnder2PC) {
Options options = CurrentOptions();
options.create_if_missing = true;
options.allow_2pc = true;
options.atomic_flush = GetParam();
options.write_buffer_size = (static_cast<size_t>(64) << 20);
auto flush_listener = std::make_shared<FlushCounterListener>();
flush_listener->expected_flush_reason = FlushReason::kManualFlush;
options.listeners.push_back(flush_listener);
Close();
Destroy(options, true);
TransactionDB* txn_db = nullptr;
TransactionDBOptions txn_db_opts;
txn_db_opts.write_policy = TxnDBWritePolicy::WRITE_COMMITTED;
ASSERT_OK(TransactionDB::Open(options, txn_db_opts, dbname_, &txn_db));
ASSERT_NE(txn_db, nullptr);
db_.reset(txn_db);
std::vector<std::string> cfs = {"puppy", "kitty"};
CreateColumnFamilies(cfs, options);
ASSERT_EQ(handles_.size(), 2);
ASSERT_EQ(handles_[0]->GetName(), cfs[0]);
ASSERT_EQ(handles_[1]->GetName(), cfs[1]);
const size_t kNumCfToFlush = options.atomic_flush ? 2 : 1;
WriteOptions wopts;
TransactionOptions txn_opts;
Transaction* txn1 = txn_db->BeginTransaction(wopts, txn_opts, nullptr);
Transaction* txn2 = txn_db->BeginTransaction(wopts, txn_opts, nullptr);
ASSERT_NE(txn1, nullptr);
ASSERT_NE(txn2, nullptr);
for (size_t i = 0; i < kNumCfToFlush; i++) {
ASSERT_OK(txn1->Put(handles_[i], "k1", "v1"));
ASSERT_OK(txn2->Put(handles_[i], "k2", "v2"));
}
ASSERT_OK(txn1->SetName("txn1"));
ASSERT_OK(txn2->SetName("txn2"));
ASSERT_OK(txn1->Prepare());
ASSERT_OK(txn2->Prepare());
ASSERT_OK(txn2->Commit());
delete txn1;
delete txn2;
for (size_t i = 0; i < kNumCfToFlush; i++) {
auto cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[i]);
ASSERT_EQ(0, cfh->cfd()->imm()->NumNotFlushed());
ASSERT_FALSE(cfh->cfd()->mem()->IsEmpty());
}
std::vector<ColumnFamilyHandle*> cfs_to_flush(kNumCfToFlush);
for (size_t i = 0; i < kNumCfToFlush; i++) {
cfs_to_flush[i] = handles_[i];
}
ASSERT_OK(txn_db->Flush(FlushOptions(), cfs_to_flush));
for (size_t i = 0; i < kNumCfToFlush; i++) {
auto cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[i]);
ASSERT_EQ(0, cfh->cfd()->imm()->NumNotFlushed());
ASSERT_TRUE(cfh->cfd()->mem()->IsEmpty());
}
cfs.push_back(kDefaultColumnFamilyName);
ASSERT_OK(TryReopenWithColumnFamilies(cfs, options));
ASSERT_TRUE(dbfull()->allow_2pc());
ASSERT_NE(dbfull()->MinLogNumberToKeep(), 0);
}
TEST_P(DBAtomicFlushTest, ManualAtomicFlush) {
Options options = CurrentOptions();
options.create_if_missing = true;
options.atomic_flush = GetParam();
options.write_buffer_size = (static_cast<size_t>(64) << 20);
auto flush_listener = std::make_shared<FlushCounterListener>();
flush_listener->expected_flush_reason = FlushReason::kManualFlush;
options.listeners.push_back(flush_listener);
CreateAndReopenWithCF({"pikachu", "eevee"}, options);
size_t num_cfs = handles_.size();
ASSERT_EQ(3, num_cfs);
WriteOptions wopts;
wopts.disableWAL = true;
for (size_t i = 0; i != num_cfs; ++i) {
ASSERT_OK(Put(static_cast<int>(i) , "key", "value", wopts));
}
for (size_t i = 0; i != num_cfs; ++i) {
auto cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[i]);
ASSERT_EQ(0, cfh->cfd()->imm()->NumNotFlushed());
ASSERT_FALSE(cfh->cfd()->mem()->IsEmpty());
}
std::vector<int> cf_ids;
for (size_t i = 0; i != num_cfs; ++i) {
cf_ids.emplace_back(static_cast<int>(i));
}
ASSERT_OK(Flush(cf_ids));
for (size_t i = 0; i != num_cfs; ++i) {
auto cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[i]);
ASSERT_EQ(0, cfh->cfd()->imm()->NumNotFlushed());
ASSERT_TRUE(cfh->cfd()->mem()->IsEmpty());
}
}
TEST_P(DBAtomicFlushTest, PrecomputeMinLogNumberToKeepNon2PC) {
Options options = CurrentOptions();
options.create_if_missing = true;
options.atomic_flush = GetParam();
options.write_buffer_size = (static_cast<size_t>(64) << 20);
CreateAndReopenWithCF({"pikachu"}, options);
const size_t num_cfs = handles_.size();
ASSERT_EQ(num_cfs, 2);
WriteOptions wopts;
for (size_t i = 0; i != num_cfs; ++i) {
ASSERT_OK(Put(static_cast<int>(i) , "key", "value", wopts));
}
{
std::vector<int> cf_ids{0};
ASSERT_OK(Flush(cf_ids));
autovector<ColumnFamilyData*> flushed_cfds;
autovector<autovector<VersionEdit*>> flush_edits;
auto flushed_cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[0]);
flushed_cfds.push_back(flushed_cfh->cfd());
flush_edits.push_back({});
auto unflushed_cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[1]);
ASSERT_EQ(PrecomputeMinLogNumberToKeepNon2PC(dbfull()->GetVersionSet(),
flushed_cfds, flush_edits),
unflushed_cfh->cfd()->GetLogNumber());
}
{
std::vector<int> cf_ids;
for (size_t i = 0; i != num_cfs; ++i) {
cf_ids.emplace_back(static_cast<int>(i));
}
ASSERT_OK(Flush(cf_ids));
uint64_t log_num_after_flush = dbfull()->TEST_GetCurrentLogNumber();
uint64_t min_log_number_to_keep = std::numeric_limits<uint64_t>::max();
autovector<ColumnFamilyData*> flushed_cfds;
autovector<autovector<VersionEdit*>> flush_edits;
for (size_t i = 0; i != num_cfs; ++i) {
auto cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[i]);
flushed_cfds.push_back(cfh->cfd());
flush_edits.push_back({});
min_log_number_to_keep =
std::min(min_log_number_to_keep, cfh->cfd()->GetLogNumber());
}
ASSERT_EQ(min_log_number_to_keep, log_num_after_flush);
ASSERT_EQ(PrecomputeMinLogNumberToKeepNon2PC(dbfull()->GetVersionSet(),
flushed_cfds, flush_edits),
min_log_number_to_keep);
}
}
TEST_P(DBAtomicFlushTest, AtomicFlushTriggeredByMemTableFull) {
Options options = CurrentOptions();
options.create_if_missing = true;
options.atomic_flush = GetParam();
options.write_buffer_size = 4096;
SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::BackgroundCallFlush:FlushFinish:0",
"DBAtomicFlushTest::AtomicFlushTriggeredByMemTableFull:BeforeCheck"}});
SyncPoint::GetInstance()->EnableProcessing();
CreateAndReopenWithCF({"pikachu", "eevee"}, options);
size_t num_cfs = handles_.size();
ASSERT_EQ(3, num_cfs);
WriteOptions wopts;
wopts.disableWAL = true;
for (size_t i = 0; i != num_cfs; ++i) {
ASSERT_OK(Put(static_cast<int>(i) , "key", "value", wopts));
}
for (int i = 0; i != 4000; ++i) {
ASSERT_OK(Put(static_cast<int>(num_cfs) - 1 ,
"key" + std::to_string(i), "value" + std::to_string(i),
wopts));
}
TEST_SYNC_POINT(
"DBAtomicFlushTest::AtomicFlushTriggeredByMemTableFull:BeforeCheck");
if (options.atomic_flush) {
for (size_t i = 0; i + 1 != num_cfs; ++i) {
auto cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[i]);
ASSERT_EQ(0, cfh->cfd()->imm()->NumNotFlushed());
ASSERT_TRUE(cfh->cfd()->mem()->IsEmpty());
}
} else {
for (size_t i = 0; i + 1 != num_cfs; ++i) {
auto cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[i]);
ASSERT_EQ(0, cfh->cfd()->imm()->NumNotFlushed());
ASSERT_FALSE(cfh->cfd()->mem()->IsEmpty());
}
}
SyncPoint::GetInstance()->DisableProcessing();
}
TEST_P(DBAtomicFlushTest, AtomicFlushRollbackSomeJobs) {
bool atomic_flush = GetParam();
if (!atomic_flush) {
return;
}
std::unique_ptr<FaultInjectionTestEnv> fault_injection_env(
new FaultInjectionTestEnv(env_));
Options options = CurrentOptions();
options.create_if_missing = true;
options.atomic_flush = atomic_flush;
options.env = fault_injection_env.get();
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::AtomicFlushMemTablesToOutputFiles:SomeFlushJobsComplete:1",
"DBAtomicFlushTest::AtomicFlushRollbackSomeJobs:1"},
{"DBAtomicFlushTest::AtomicFlushRollbackSomeJobs:2",
"DBImpl::AtomicFlushMemTablesToOutputFiles:SomeFlushJobsComplete:2"}});
SyncPoint::GetInstance()->EnableProcessing();
CreateAndReopenWithCF({"pikachu", "eevee"}, options);
size_t num_cfs = handles_.size();
ASSERT_EQ(3, num_cfs);
WriteOptions wopts;
wopts.disableWAL = true;
for (size_t i = 0; i != num_cfs; ++i) {
int cf_id = static_cast<int>(i);
ASSERT_OK(Put(cf_id, "key", "value", wopts));
}
FlushOptions flush_opts;
flush_opts.wait = false;
ASSERT_OK(dbfull()->Flush(flush_opts, handles_));
TEST_SYNC_POINT("DBAtomicFlushTest::AtomicFlushRollbackSomeJobs:1");
fault_injection_env->SetFilesystemActive(false);
TEST_SYNC_POINT("DBAtomicFlushTest::AtomicFlushRollbackSomeJobs:2");
for (auto* cfh : handles_) {
ASSERT_NOK(dbfull()->TEST_WaitForFlushMemTable(cfh));
}
for (size_t i = 0; i != num_cfs; ++i) {
auto cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[i]);
ASSERT_EQ(1, cfh->cfd()->imm()->NumNotFlushed());
ASSERT_TRUE(cfh->cfd()->mem()->IsEmpty());
}
fault_injection_env->SetFilesystemActive(true);
Destroy(options);
}
TEST_P(DBAtomicFlushTest, FlushMultipleCFs_DropSomeBeforeRequestFlush) {
bool atomic_flush = GetParam();
if (!atomic_flush) {
return;
}
Options options = CurrentOptions();
options.create_if_missing = true;
options.atomic_flush = atomic_flush;
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->EnableProcessing();
CreateAndReopenWithCF({"pikachu", "eevee"}, options);
size_t num_cfs = handles_.size();
ASSERT_EQ(3, num_cfs);
WriteOptions wopts;
wopts.disableWAL = true;
std::vector<int> cf_ids;
for (size_t i = 0; i != num_cfs; ++i) {
int cf_id = static_cast<int>(i);
ASSERT_OK(Put(cf_id, "key", "value", wopts));
cf_ids.push_back(cf_id);
}
ASSERT_OK(dbfull()->DropColumnFamily(handles_[1]));
ASSERT_TRUE(Flush(cf_ids).IsColumnFamilyDropped());
Destroy(options);
}
TEST_P(DBAtomicFlushTest,
FlushMultipleCFs_DropSomeAfterScheduleFlushBeforeFlushJobRun) {
bool atomic_flush = GetParam();
if (!atomic_flush) {
return;
}
Options options = CurrentOptions();
options.create_if_missing = true;
options.atomic_flush = atomic_flush;
CreateAndReopenWithCF({"pikachu", "eevee"}, options);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::AtomicFlushMemTables:AfterScheduleFlush",
"DBAtomicFlushTest::BeforeDropCF"},
{"DBAtomicFlushTest::AfterDropCF",
"DBImpl::BackgroundCallFlush:start"}});
SyncPoint::GetInstance()->EnableProcessing();
size_t num_cfs = handles_.size();
ASSERT_EQ(3, num_cfs);
WriteOptions wopts;
wopts.disableWAL = true;
for (size_t i = 0; i != num_cfs; ++i) {
int cf_id = static_cast<int>(i);
ASSERT_OK(Put(cf_id, "key", "value", wopts));
}
port::Thread user_thread([&]() {
TEST_SYNC_POINT("DBAtomicFlushTest::BeforeDropCF");
ASSERT_OK(dbfull()->DropColumnFamily(handles_[1]));
TEST_SYNC_POINT("DBAtomicFlushTest::AfterDropCF");
});
FlushOptions flush_opts;
flush_opts.wait = true;
ASSERT_OK(dbfull()->Flush(flush_opts, handles_));
user_thread.join();
for (size_t i = 0; i != num_cfs; ++i) {
int cf_id = static_cast<int>(i);
ASSERT_EQ("value", Get(cf_id, "key"));
}
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "eevee"}, options);
num_cfs = handles_.size();
ASSERT_EQ(2, num_cfs);
for (size_t i = 0; i != num_cfs; ++i) {
int cf_id = static_cast<int>(i);
ASSERT_EQ("value", Get(cf_id, "key"));
}
Destroy(options);
}
TEST_P(DBAtomicFlushTest, TriggerFlushAndClose) {
bool atomic_flush = GetParam();
if (!atomic_flush) {
return;
}
const int kNumKeysTriggerFlush = 4;
Options options = CurrentOptions();
options.create_if_missing = true;
options.atomic_flush = atomic_flush;
options.memtable_factory.reset(
test::NewSpecialSkipListFactory(kNumKeysTriggerFlush));
CreateAndReopenWithCF({"pikachu"}, options);
for (int i = 0; i != kNumKeysTriggerFlush; ++i) {
ASSERT_OK(Put(0, "key" + std::to_string(i), "value" + std::to_string(i)));
}
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(Put(0, "key", "value"));
Close();
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options);
ASSERT_EQ("value", Get(0, "key"));
}
TEST_P(DBAtomicFlushTest, PickMemtablesRaceWithBackgroundFlush) {
bool atomic_flush = GetParam();
Options options = CurrentOptions();
options.create_if_missing = true;
options.atomic_flush = atomic_flush;
options.max_write_buffer_number = 4;
options.min_write_buffer_number_to_merge = 2;
CreateAndReopenWithCF({"pikachu"}, options);
ASSERT_EQ(2, handles_.size());
ASSERT_OK(dbfull()->PauseBackgroundWork());
ASSERT_OK(Put(0, "key00", "value00"));
ASSERT_OK(Put(1, "key10", "value10"));
FlushOptions flush_opts;
flush_opts.wait = false;
ASSERT_OK(dbfull()->Flush(flush_opts, handles_));
ASSERT_OK(Put(0, "key01", "value01"));
ASSERT_OK(dbfull()->Flush(flush_opts));
ASSERT_OK(dbfull()->DropColumnFamily(handles_[1]));
ASSERT_OK(dbfull()->DestroyColumnFamilyHandle(handles_[1]));
handles_[1] = nullptr;
ASSERT_OK(dbfull()->ContinueBackgroundWork());
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[0]));
delete handles_[0];
handles_.clear();
}
TEST_P(DBAtomicFlushTest, CFDropRaceWithWaitForFlushMemTables) {
bool atomic_flush = GetParam();
if (!atomic_flush) {
return;
}
Options options = CurrentOptions();
options.create_if_missing = true;
options.atomic_flush = atomic_flush;
CreateAndReopenWithCF({"pikachu"}, options);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::AtomicFlushMemTables:AfterScheduleFlush",
"DBAtomicFlushTest::CFDropRaceWithWaitForFlushMemTables:BeforeDrop"},
{"DBAtomicFlushTest::CFDropRaceWithWaitForFlushMemTables:AfterFree",
"DBImpl::BackgroundCallFlush:start"},
{"DBImpl::BackgroundCallFlush:start",
"DBImpl::AtomicFlushMemTables:BeforeWaitForBgFlush"}});
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_EQ(2, handles_.size());
ASSERT_OK(Put(0, "key", "value"));
ASSERT_OK(Put(1, "key", "value"));
auto* cfd_default =
static_cast<ColumnFamilyHandleImpl*>(dbfull()->DefaultColumnFamily())
->cfd();
auto* cfd_pikachu = static_cast<ColumnFamilyHandleImpl*>(handles_[1])->cfd();
port::Thread drop_cf_thr([&]() {
TEST_SYNC_POINT(
"DBAtomicFlushTest::CFDropRaceWithWaitForFlushMemTables:BeforeDrop");
ASSERT_OK(dbfull()->DropColumnFamily(handles_[1]));
delete handles_[1];
handles_.resize(1);
TEST_SYNC_POINT(
"DBAtomicFlushTest::CFDropRaceWithWaitForFlushMemTables:AfterFree");
});
FlushOptions flush_opts;
flush_opts.allow_write_stall = true;
ASSERT_OK(dbfull()->TEST_AtomicFlushMemTables({cfd_default, cfd_pikachu},
flush_opts));
drop_cf_thr.join();
Close();
SyncPoint::GetInstance()->DisableProcessing();
}
TEST_P(DBAtomicFlushTest, RollbackAfterFailToInstallResults) {
bool atomic_flush = GetParam();
if (!atomic_flush) {
return;
}
auto fault_injection_env = std::make_shared<FaultInjectionTestEnv>(env_);
Options options = CurrentOptions();
options.env = fault_injection_env.get();
options.create_if_missing = true;
options.atomic_flush = atomic_flush;
CreateAndReopenWithCF({"pikachu"}, options);
ASSERT_EQ(2, handles_.size());
for (size_t cf = 0; cf < handles_.size(); ++cf) {
ASSERT_OK(Put(static_cast<int>(cf), "a", "value"));
}
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->SetCallBack(
"VersionSet::ProcessManifestWrites:BeforeWriteLastVersionEdit:0",
[&](void* ) { fault_injection_env->SetFilesystemActive(false); });
SyncPoint::GetInstance()->EnableProcessing();
FlushOptions flush_opts;
Status s = db_->Flush(flush_opts, handles_);
ASSERT_NOK(s);
fault_injection_env->SetFilesystemActive(true);
Close();
SyncPoint::GetInstance()->ClearAllCallBacks();
}
TEST_P(DBAtomicFlushTest, FailureInMultiCfAutomaticFlush) {
bool atomic_flush = GetParam();
auto fault_injection_env = std::make_shared<FaultInjectionTestEnv>(env_);
Options options = CurrentOptions();
options.env = fault_injection_env.get();
options.create_if_missing = true;
options.atomic_flush = atomic_flush;
const int kNumKeysTriggerFlush = 4;
options.memtable_factory.reset(
test::NewSpecialSkipListFactory(kNumKeysTriggerFlush));
CreateAndReopenWithCF({"pikachu"}, options);
ASSERT_EQ(2, handles_.size());
for (size_t cf = 0; cf < handles_.size(); ++cf) {
ASSERT_OK(Put(static_cast<int>(cf), "a", "value"));
}
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::ScheduleFlushes:PreSwitchMemtable",
[&](void* ) { fault_injection_env->SetFilesystemActive(false); });
SyncPoint::GetInstance()->EnableProcessing();
for (int i = 1; i < kNumKeysTriggerFlush; ++i) {
ASSERT_OK(Put(0, "key" + std::to_string(i), "value" + std::to_string(i)));
}
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
ASSERT_NOK(Put(0, "x", "y"));
fault_injection_env->SetFilesystemActive(true);
Close();
SyncPoint::GetInstance()->ClearAllCallBacks();
}
TEST_P(DBAtomicFlushTest, BgThreadNoWaitAfterManifestError) {
bool atomic_flush = GetParam();
if (!atomic_flush) {
return;
}
auto fault_injection_env = std::make_shared<FaultInjectionTestEnv>(env_);
Options options = GetDefaultOptions();
options.create_if_missing = true;
options.atomic_flush = true;
options.env = fault_injection_env.get();
options.max_background_jobs = 8;
options.max_write_buffer_number = 8;
CreateAndReopenWithCF({"pikachu"}, options);
assert(2 == handles_.size());
WriteOptions write_opts;
write_opts.disableWAL = true;
ASSERT_OK(Put(0, "a", "v_0_a", write_opts));
ASSERT_OK(Put(1, "a", "v_1_a", write_opts));
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->LoadDependency({
{"BgFlushThr2:WaitToCommit", "BgFlushThr1:BeforeWriteManifest"},
});
std::thread::id bg_flush_thr1, bg_flush_thr2;
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundCallFlush:start", [&](void*) {
if (bg_flush_thr1 == std::thread::id()) {
bg_flush_thr1 = std::this_thread::get_id();
} else if (bg_flush_thr2 == std::thread::id()) {
bg_flush_thr2 = std::this_thread::get_id();
}
});
int called = 0;
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::AtomicFlushMemTablesToOutputFiles:WaitToCommit", [&](void* arg) {
if (std::this_thread::get_id() == bg_flush_thr2) {
const auto* ptr = reinterpret_cast<std::pair<Status, bool>*>(arg);
assert(ptr);
if (0 == called) {
ASSERT_OK(ptr->first);
ASSERT_TRUE(ptr->second);
} else if (1 == called) {
ASSERT_TRUE(ptr->first.IsIOError());
ASSERT_FALSE(ptr->second);
}
++called;
TEST_SYNC_POINT("BgFlushThr2:WaitToCommit");
}
});
SyncPoint::GetInstance()->SetCallBack(
"VersionSet::ProcessManifestWrites:BeforeWriteLastVersionEdit:0",
[&](void*) {
if (std::this_thread::get_id() == bg_flush_thr1) {
TEST_SYNC_POINT("BgFlushThr1:BeforeWriteManifest");
}
});
SyncPoint::GetInstance()->SetCallBack(
"VersionSet::LogAndApply:WriteManifest", [&](void*) {
if (std::this_thread::get_id() != bg_flush_thr1) {
return;
}
ASSERT_OK(db_->Put(write_opts, "b", "v_1_b"));
FlushOptions flush_opts;
flush_opts.wait = false;
std::vector<ColumnFamilyHandle*> cfhs(1, db_->DefaultColumnFamily());
ASSERT_OK(dbfull()->Flush(flush_opts, cfhs));
});
SyncPoint::GetInstance()->SetCallBack(
"VersionSet::ProcessManifestWrites:AfterSyncManifest", [&](void* arg) {
auto* ptr = static_cast<IOStatus*>(arg);
assert(ptr);
*ptr = IOStatus::IOError("Injected failure");
});
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_TRUE(dbfull()->Flush(FlushOptions(), handles_).IsIOError());
Close();
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}
TEST_P(DBAtomicFlushTest, NoWaitWhenWritesStopped) {
Options options = GetDefaultOptions();
options.create_if_missing = true;
options.atomic_flush = GetParam();
options.max_write_buffer_number = 2;
options.memtable_factory.reset(test::NewSpecialSkipListFactory(1));
Reopen(options);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::DelayWrite:Start",
"DBAtomicFlushTest::NoWaitWhenWritesStopped:0"}});
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(dbfull()->PauseBackgroundWork());
for (int i = 0; i < options.max_write_buffer_number; ++i) {
ASSERT_OK(Put("k" + std::to_string(i), "v" + std::to_string(i)));
}
std::thread stalled_writer([&]() { ASSERT_OK(Put("k", "v")); });
TEST_SYNC_POINT("DBAtomicFlushTest::NoWaitWhenWritesStopped:0");
{
FlushOptions flush_opts;
flush_opts.wait = false;
flush_opts.allow_write_stall = true;
ASSERT_TRUE(db_->Flush(flush_opts).IsTryAgain());
}
ASSERT_OK(dbfull()->ContinueBackgroundWork());
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
stalled_writer.join();
SyncPoint::GetInstance()->DisableProcessing();
}
INSTANTIATE_TEST_CASE_P(DBFlushDirectIOTest, DBFlushDirectIOTest,
testing::Bool());
INSTANTIATE_TEST_CASE_P(DBAtomicFlushTest, DBAtomicFlushTest, testing::Bool());
TEST_F(DBFlushTest, NonAtomicFlushRollbackPendingFlushes) {
Options opts = CurrentOptions();
opts.atomic_flush = false;
opts.memtable_factory.reset(test::NewSpecialSkipListFactory(1));
opts.max_write_buffer_number = 64;
opts.max_background_flushes = 4;
env_->SetBackgroundThreads(4, Env::HIGH);
DestroyAndReopen(opts);
std::atomic_int flush_count = 0;
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->SetCallBack(
"FlushJob::WriteLevel0Table:s", [&](void* s_ptr) {
int c = flush_count.fetch_add(1);
if (c == 0) {
Status* s = (Status*)(s_ptr);
IOStatus io_error = IOStatus::IOError("injected foobar");
io_error.SetRetryable(true);
*s = io_error;
TEST_SYNC_POINT("Let mem1 flush start");
TEST_SYNC_POINT("Wait for mem1 flush to finish");
}
});
SyncPoint::GetInstance()->LoadDependency(
{{"Let mem1 flush start", "Mem1 flush starts"},
{"DBImpl::BGWorkFlush:done", "Wait for mem1 flush to finish"},
{"RecoverFromRetryableBGIOError:RecoverSuccess",
"Wait for error recover"}});
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(Put(Key(1), "val1"));
ASSERT_OK(Put(Key(2), "val2"));
TEST_SYNC_POINT("Mem1 flush starts");
ASSERT_OK(Put(Key(3), "val3"));
TEST_SYNC_POINT("Wait for error recover");
ASSERT_EQ(1, NumTableFilesAtLevel(0));
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(DBFlushTest, AbortNonAtomicFlushWhenBGError) {
Options opts = CurrentOptions();
opts.atomic_flush = false;
opts.memtable_factory.reset(test::NewSpecialSkipListFactory(1));
opts.max_write_buffer_number = 64;
opts.max_background_flushes = 4;
env_->SetBackgroundThreads(4, Env::HIGH);
DestroyAndReopen(opts);
std::atomic_int flush_count = 0;
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->SetCallBack(
"FlushJob::WriteLevel0Table:s", [&](void* s_ptr) {
int c = flush_count.fetch_add(1);
if (c == 0) {
Status* s = (Status*)(s_ptr);
IOStatus io_error = IOStatus::IOError("injected foobar");
io_error.SetRetryable(true);
*s = io_error;
TEST_SYNC_POINT("Let mem1 flush start");
TEST_SYNC_POINT("Wait for mem1 flush to finish");
TEST_SYNC_POINT("Let mem2 flush start");
TEST_SYNC_POINT("Wait for mem2 to start writing table");
}
});
SyncPoint::GetInstance()->SetCallBack(
"FlushJob::WriteLevel0Table", [&](void* mems) {
autovector<MemTable*>* mems_ptr = (autovector<MemTable*>*)mems;
if ((*mems_ptr)[0]->GetID() == 3) {
TEST_SYNC_POINT("Mem2 flush starts writing table");
TEST_SYNC_POINT("Mem2 flush waits until rollback");
}
});
SyncPoint::GetInstance()->LoadDependency(
{{"Let mem1 flush start", "Mem1 flush starts"},
{"DBImpl::BGWorkFlush:done", "Wait for mem1 flush to finish"},
{"Let mem2 flush start", "Mem2 flush starts"},
{"Mem2 flush starts writing table",
"Wait for mem2 to start writing table"},
{"RollbackMemtableFlush", "Mem2 flush waits until rollback"},
{"RecoverFromRetryableBGIOError:RecoverSuccess",
"Wait for error recover"}});
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(Put(Key(1), "val1"));
ASSERT_OK(Put(Key(2), "val2"));
TEST_SYNC_POINT("Mem1 flush starts");
ASSERT_OK(Put(Key(3), "val3"));
TEST_SYNC_POINT("Mem2 flush starts");
ASSERT_OK(Put(Key(4), "val4"));
TEST_SYNC_POINT("Wait for error recover");
ASSERT_EQ(1, NumTableFilesAtLevel(0));
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(DBFlushTest, NonAtomicNormalFlushAbortWhenBGError) {
Options opts = CurrentOptions();
opts.atomic_flush = false;
opts.memtable_factory.reset(test::NewSpecialSkipListFactory(1));
opts.max_write_buffer_number = 64;
opts.max_background_flushes = 1;
env_->SetBackgroundThreads(2, Env::HIGH);
DestroyAndReopen(opts);
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->DisableProcessing();
std::atomic_int flush_write_table_count = 0;
SyncPoint::GetInstance()->SetCallBack(
"FlushJob::WriteLevel0Table:s", [&](void* s_ptr) {
int c = flush_write_table_count.fetch_add(1);
if (c == 0) {
Status* s = (Status*)(s_ptr);
IOStatus io_error = IOStatus::IOError("injected foobar");
io_error.SetRetryable(true);
*s = io_error;
}
});
SyncPoint::GetInstance()->EnableProcessing();
SyncPoint::GetInstance()->LoadDependency(
{{"Let error recovery start",
"RecoverFromRetryableBGIOError:BeforeStart"},
{"RecoverFromRetryableBGIOError:RecoverSuccess",
"Wait for error recover"}});
ASSERT_OK(Put(Key(1), "val1"));
ASSERT_OK(Put(Key(2), "val2"));
dbfull()->TEST_WaitForFlushMemTable().PermitUncheckedError();
ASSERT_OK(Put(Key(3), "val3"));
ASSERT_NOK(dbfull()->TEST_WaitForFlushMemTable());
ASSERT_EQ(0, NumTableFilesAtLevel(0));
TEST_SYNC_POINT("Let error recovery start");
TEST_SYNC_POINT("Wait for error recover");
ASSERT_EQ(1, NumTableFilesAtLevel(0));
ASSERT_EQ(2, flush_write_table_count);
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(DBFlushTest, DBStuckAfterAtomicFlushError) {
Options opts = CurrentOptions();
opts.atomic_flush = true;
opts.memtable_factory.reset(test::NewSpecialSkipListFactory(1));
opts.max_write_buffer_number = 64;
opts.max_background_flushes = 4;
env_->SetBackgroundThreads(4, Env::HIGH);
DestroyAndReopen(opts);
std::atomic_int flush_count = 0;
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->SetCallBack(
"FlushJob::WriteLevel0Table:s", [&](void* s_ptr) {
int c = flush_count.fetch_add(1);
if (c == 0) {
Status* s = (Status*)(s_ptr);
IOStatus io_error = IOStatus::IOError("injected foobar");
io_error.SetRetryable(true);
*s = io_error;
TEST_SYNC_POINT("Let flush for mem1 start");
TEST_SYNC_POINT("Wait for flush for mem1");
}
});
SyncPoint::GetInstance()->LoadDependency(
{{"Let flush for mem1 start", "Flush for mem1"},
{"DBImpl::AtomicFlushMemTablesToOutputFiles:WaitCV",
"Wait for flush for mem1"},
{"RecoverFromRetryableBGIOError:BeforeStart",
"Wait for resume to start"},
{"Recovery should continue here",
"RecoverFromRetryableBGIOError:BeforeStart2"},
{"RecoverFromRetryableBGIOError:RecoverSuccess",
"Wait for error recover"}});
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(Put(Key(1), "val1"));
ASSERT_OK(Put(Key(2), "val2"));
TEST_SYNC_POINT("Flush for mem1");
ASSERT_OK(Put(Key(3), "val3"));
TEST_SYNC_POINT("Wait for resume to start");
ASSERT_OK(Put(Key(4), "val4"));
ASSERT_NOK(dbfull()->TEST_WaitForBackgroundWork());
ASSERT_EQ(0, NumTableFilesAtLevel(0));
TEST_SYNC_POINT("Recovery should continue here");
TEST_SYNC_POINT("Wait for error recover");
ASSERT_OK(dbfull()->TEST_WaitForBackgroundWork());
ASSERT_EQ(1, NumTableFilesAtLevel(0));
}
TEST_F(DBFlushTest, VerifyOutputRecordCount) {
for (bool use_plain_table : {false, true}) {
Options options = CurrentOptions();
options.flush_verify_memtable_count = true;
options.merge_operator = MergeOperators::CreateStringAppendOperator();
DestroyAndReopen(options);
if (use_plain_table) {
options.table_factory.reset(NewPlainTableFactory());
}
ASSERT_OK(Merge("k0", "v1"));
ASSERT_OK(Put("k1", "v1"));
ASSERT_OK(Put("k2", "v1"));
ASSERT_OK(SingleDelete("k2"));
ASSERT_OK(Delete("k2"));
ASSERT_OK(Delete("k3"));
ASSERT_OK(db_->DeleteRange(WriteOptions(), "k1", "k3"));
ASSERT_OK(Flush());
DestroyAndReopen(options);
if (use_plain_table) {
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"PlainTableBuilder::Add::skip",
[&](void* skip) { *(bool*)skip = true; });
} else {
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"BlockBasedTableBuilder::Add::skip",
[&](void* skip) { *(bool*)skip = true; });
}
SyncPoint::GetInstance()->EnableProcessing();
const char* expect =
"Number of keys in flush output SST files does not match";
ASSERT_OK(Put("k1", "v1"));
ASSERT_OK(Put("k2", "v1"));
Status s = TryReopen(options);
ASSERT_TRUE(s.IsCorruption());
ASSERT_TRUE(std::strstr(s.getState(), expect));
DestroyAndReopen(options);
ASSERT_OK(Put("k1", "v1"));
ASSERT_OK(Put("k2", "v1"));
s = Flush();
ASSERT_TRUE(s.IsCorruption());
ASSERT_TRUE(std::strstr(s.getState(), expect));
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
}
class DBFlushSuperBlockTest
: public DBFlushTest,
public ::testing::WithParamInterface<std::tuple<bool, size_t, size_t>> {
public:
DBFlushSuperBlockTest() : DBFlushTest() {}
std::string formatKey(int i) {
int desired_length = 10;
char buffer[64];
snprintf(buffer, 64, "%0*d", desired_length, i);
return buffer;
}
void VerifyReadWithGet(int key_count) {
for (int i = 0; i < key_count; ++i) {
PinnableSlice value;
ASSERT_OK(Get(formatKey(i), &value));
ASSERT_EQ(value.ToString(), added_data[formatKey(i)]);
}
}
void VerifyReadWithIterator(int key_count) {
{
std::unique_ptr<Iterator> it(db_->NewIterator(ReadOptions()));
int i = 0;
for (it->SeekToFirst(); it->Valid(); it->Next()) {
ASSERT_OK(it->status());
ASSERT_EQ((it->key()).ToString(), formatKey(i));
ASSERT_EQ((it->value()).ToString(), added_data[formatKey(i)]);
i++;
}
ASSERT_OK(it->status());
ASSERT_EQ(i, key_count);
}
}
protected:
Random rnd{123};
std::unordered_map<std::string, std::string> added_data;
};
constexpr size_t kLowSpaceOverheadRatio = 256;
TEST_P(DBFlushSuperBlockTest, SuperBlock) {
constexpr int key_count = 12345;
Options options;
options.env = env_;
options.file_checksum_gen_factory = GetFileChecksumGenCrc32cFactory();
options.paranoid_file_checks = true;
options.write_buffer_size = 1024 * 1024;
BlockBasedTableOptions block_options;
block_options.block_align = get<0>(GetParam());
block_options.index_block_restart_interval = 3;
block_options.super_block_alignment_size = get<1>(GetParam());
block_options.super_block_alignment_space_overhead_ratio = get<2>(GetParam());
options.table_factory.reset(NewBlockBasedTableFactory(block_options));
if (block_options.block_align) {
options.compression = kNoCompression;
}
ASSERT_OK(options.table_factory->ValidateOptions(
DBOptions(options), ColumnFamilyOptions(options)));
Reopen(options);
int super_block_pad_count = 0;
int super_block_pad_exceed_limit_count = 0;
SyncPoint::GetInstance()->SetCallBack(
"BlockBasedTableBuilder::WriteMaybeCompressedBlock:"
"SuperBlockAlignment",
[&super_block_pad_count](void* ) { super_block_pad_count++; });
SyncPoint::GetInstance()->SetCallBack(
"BlockBasedTableBuilder::WriteMaybeCompressedBlock:"
"SuperBlockAlignmentPaddingBytesExceedLimit",
[&super_block_pad_exceed_limit_count](void* ) {
super_block_pad_exceed_limit_count++;
});
SyncPoint::GetInstance()->EnableProcessing();
for (int i = 0; i < key_count; ++i) {
added_data[formatKey(i)] = std::string(rnd.RandomString(rnd.Next() % 1000));
ASSERT_OK(Put(formatKey(i), added_data[formatKey(i)]));
}
Reopen(options);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
if (block_options.super_block_alignment_size != 0 &&
!block_options.block_align) {
ASSERT_GT(super_block_pad_count, 0);
} else {
ASSERT_EQ(super_block_pad_count, 0);
}
if (!block_options.block_align &&
block_options.super_block_alignment_size != 0 &&
block_options.super_block_alignment_space_overhead_ratio ==
kLowSpaceOverheadRatio) {
ASSERT_GT(super_block_pad_exceed_limit_count, 0);
}
VerifyReadWithGet(key_count);
Reopen(options);
VerifyReadWithIterator(key_count);
ASSERT_OK(db_->VerifyFileChecksums(ReadOptions()));
if (block_options.super_block_alignment_size == 0) {
block_options.super_block_alignment_size = 16 * 1024;
} else {
block_options.super_block_alignment_size = 0;
}
options.table_factory.reset(NewBlockBasedTableFactory(block_options));
Reopen(options);
VerifyReadWithGet(key_count);
Reopen(options);
VerifyReadWithIterator(key_count);
ASSERT_OK(db_->VerifyFileChecksums(ReadOptions()));
}
INSTANTIATE_TEST_CASE_P(
SuperBlockTests, DBFlushSuperBlockTest,
testing::Combine(testing::Bool(), testing::Values(0, 32 * 1024, 16 * 1024),
testing::Values(4, kLowSpaceOverheadRatio)));
}
int main(int argc, char** argv) {
ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}