#include "db/db_test_util.h"
#include "db/db_with_timestamp_test_util.h"
#include "options/options_helper.h"
#include "port/port.h"
#include "port/stack_trace.h"
#include "rocksdb/file_system.h"
#include "test_util/sync_point.h"
#include "util/defer.h"
#include "util/udt_util.h"
#include "utilities/fault_injection_env.h"
#include "utilities/fault_injection_fs.h"
namespace ROCKSDB_NAMESPACE {
class DBWALTestBase : public DBTestBase {
protected:
explicit DBWALTestBase(const std::string& dir_name)
: DBTestBase(dir_name, true) {}
#if defined(ROCKSDB_PLATFORM_POSIX)
public:
#if defined(ROCKSDB_FALLOCATE_PRESENT)
bool IsFallocateSupported() {
std::string fname_test_fallocate = dbname_ + "/preallocate_testfile";
int fd = -1;
do {
fd = open(fname_test_fallocate.c_str(), O_CREAT | O_RDWR | O_TRUNC, 0644);
} while (fd < 0 && errno == EINTR);
assert(fd > 0);
int alloc_status = fallocate(fd, 0, 0, 1);
int err_number = errno;
close(fd);
assert(env_->DeleteFile(fname_test_fallocate) == Status::OK());
if (err_number == ENOSYS || err_number == EOPNOTSUPP) {
fprintf(stderr, "Skipped preallocated space check: %s\n",
errnoStr(err_number).c_str());
return false;
}
assert(alloc_status == 0);
return true;
}
#endif
uint64_t GetAllocatedFileSize(std::string file_name) {
struct stat sbuf;
int err = stat(file_name.c_str(), &sbuf);
assert(err == 0);
return sbuf.st_blocks * 512;
}
#endif };
class DBWALTest : public DBWALTestBase {
public:
DBWALTest() : DBWALTestBase("/db_wal_test") {}
};
class EnrichedSpecialEnv : public SpecialEnv {
public:
explicit EnrichedSpecialEnv(Env* base) : SpecialEnv(base) {}
Status NewSequentialFile(const std::string& f,
std::unique_ptr<SequentialFile>* r,
const EnvOptions& soptions) override {
InstrumentedMutexLock l(&env_mutex_);
if (f == skipped_wal) {
deleted_wal_reopened = true;
if (IsWAL(f) && largest_deleted_wal.size() != 0 &&
f.compare(largest_deleted_wal) <= 0) {
gap_in_wals = true;
}
}
return SpecialEnv::NewSequentialFile(f, r, soptions);
}
Status DeleteFile(const std::string& fname) override {
if (IsWAL(fname)) {
deleted_wal_cnt++;
InstrumentedMutexLock l(&env_mutex_);
if (skipped_wal.size() != 0 && skipped_wal != fname) {
if (largest_deleted_wal.size() == 0 ||
largest_deleted_wal.compare(fname) < 0) {
largest_deleted_wal = fname;
}
} else {
skipped_wal = fname;
return Status::OK();
}
}
return SpecialEnv::DeleteFile(fname);
}
bool IsWAL(const std::string& fname) {
return fname.compare(fname.size() - 3, 3, "log") == 0;
}
InstrumentedMutex env_mutex_;
std::string skipped_wal;
std::string largest_deleted_wal;
std::atomic<size_t> deleted_wal_cnt = {0};
std::atomic<bool> deleted_wal_reopened = {false};
std::atomic<bool> gap_in_wals = {false};
};
class DBWALTestWithEnrichedEnv : public DBTestBase {
public:
DBWALTestWithEnrichedEnv()
: DBTestBase("db_wal_test", true) {
enriched_env_ = new EnrichedSpecialEnv(env_->target());
auto options = CurrentOptions();
options.env = enriched_env_;
options.allow_2pc = true;
Reopen(options);
delete env_;
env_ = enriched_env_;
}
protected:
EnrichedSpecialEnv* enriched_env_;
};
TEST_F(DBWALTestWithEnrichedEnv, SkipDeletedWALs) {
auto options = last_options_;
options.write_buffer_size = 128;
Reopen(options);
SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::PurgeObsoleteFiles:End",
"DBWALTestWithEnrichedEnv.SkipDeletedWALs:AfterFlush"}});
SyncPoint::GetInstance()->EnableProcessing();
WriteOptions writeOpt = WriteOptions();
for (int i = 0; i < 128 * 5; i++) {
ASSERT_OK(dbfull()->Put(writeOpt, "foo", "v1"));
}
FlushOptions fo;
fo.wait = true;
ASSERT_OK(db_->Flush(fo));
TEST_SYNC_POINT("DBWALTestWithEnrichedEnv.SkipDeletedWALs:AfterFlush");
ASSERT_NE(0, enriched_env_->deleted_wal_cnt);
ASSERT_NE(0, enriched_env_->skipped_wal.size());
options = last_options_;
Reopen(options);
ASSERT_FALSE(enriched_env_->deleted_wal_reopened);
ASSERT_FALSE(enriched_env_->gap_in_wals);
SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(DBWALTest, WAL) {
do {
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
WriteOptions writeOpt = WriteOptions();
writeOpt.disableWAL = true;
ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v1"));
ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v1"));
ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
ASSERT_EQ("v1", Get(1, "foo"));
ASSERT_EQ("v1", Get(1, "bar"));
writeOpt.disableWAL = false;
ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v2"));
writeOpt.disableWAL = true;
ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v2"));
ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
ASSERT_EQ("v2", Get(1, "bar"));
ASSERT_EQ("v2", Get(1, "foo"));
writeOpt.disableWAL = true;
ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v3"));
writeOpt.disableWAL = false;
ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v3"));
ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
ASSERT_EQ("v3", Get(1, "foo"));
ASSERT_EQ("v3", Get(1, "bar"));
} while (ChangeWalOptions());
}
TEST_F(DBWALTest, RollLog) {
do {
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
ASSERT_OK(Put(1, "foo", "v1"));
ASSERT_OK(Put(1, "baz", "v5"));
ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
for (int i = 0; i < 10; i++) {
ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
}
ASSERT_OK(Put(1, "foo", "v4"));
for (int i = 0; i < 10; i++) {
ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
}
} while (ChangeWalOptions());
}
TEST_F(DBWALTest, SyncWALNotBlockWrite) {
Options options = CurrentOptions();
options.max_write_buffer_number = 4;
DestroyAndReopen(options);
ASSERT_OK(Put("foo1", "bar1"));
ASSERT_OK(Put("foo5", "bar5"));
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
{"WritableFileWriter::SyncWithoutFlush:1",
"DBWALTest::SyncWALNotBlockWrite:1"},
{"DBWALTest::SyncWALNotBlockWrite:2",
"WritableFileWriter::SyncWithoutFlush:2"},
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
ROCKSDB_NAMESPACE::port::Thread thread([&]() { ASSERT_OK(db_->SyncWAL()); });
TEST_SYNC_POINT("DBWALTest::SyncWALNotBlockWrite:1");
ASSERT_OK(Put("foo2", "bar2"));
ASSERT_OK(Put("foo3", "bar3"));
FlushOptions fo;
fo.wait = false;
ASSERT_OK(db_->Flush(fo));
ASSERT_OK(Put("foo4", "bar4"));
TEST_SYNC_POINT("DBWALTest::SyncWALNotBlockWrite:2");
thread.join();
ASSERT_EQ(Get("foo1"), "bar1");
ASSERT_EQ(Get("foo2"), "bar2");
ASSERT_EQ(Get("foo3"), "bar3");
ASSERT_EQ(Get("foo4"), "bar4");
ASSERT_EQ(Get("foo5"), "bar5");
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(DBWALTest, SyncWALNotWaitWrite) {
ASSERT_OK(Put("foo1", "bar1"));
ASSERT_OK(Put("foo3", "bar3"));
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
{"SpecialEnv::SpecialWalFile::Append:1",
"DBWALTest::SyncWALNotWaitWrite:1"},
{"DBWALTest::SyncWALNotWaitWrite:2",
"SpecialEnv::SpecialWalFile::Append:2"},
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
ROCKSDB_NAMESPACE::port::Thread thread(
[&]() { ASSERT_OK(Put("foo2", "bar2")); });
ASSERT_OK(db_->SyncWAL());
thread.join();
ASSERT_EQ(Get("foo1"), "bar1");
ASSERT_EQ(Get("foo2"), "bar2");
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(DBWALTest, Recover) {
do {
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
ASSERT_OK(Put(1, "foo", "v1"));
ASSERT_OK(Put(1, "baz", "v5"));
ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
ASSERT_EQ("v1", Get(1, "foo"));
ASSERT_EQ("v1", Get(1, "foo"));
ASSERT_EQ("v5", Get(1, "baz"));
ASSERT_OK(Put(1, "bar", "v2"));
ASSERT_OK(Put(1, "foo", "v3"));
ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
ASSERT_EQ("v3", Get(1, "foo"));
ASSERT_OK(Put(1, "foo", "v4"));
ASSERT_EQ("v4", Get(1, "foo"));
ASSERT_EQ("v2", Get(1, "bar"));
ASSERT_EQ("v5", Get(1, "baz"));
} while (ChangeWalOptions());
}
class DBWALTestWithTimestamp
: public DBBasicTestWithTimestampBase,
public testing::WithParamInterface<test::UserDefinedTimestampTestMode> {
public:
DBWALTestWithTimestamp()
: DBBasicTestWithTimestampBase("db_wal_test_with_timestamp") {}
Status CreateAndReopenWithTs(const std::vector<std::string>& cfs,
const Options& ts_options, bool persist_udt,
bool avoid_flush_during_recovery = false) {
Options default_options = CurrentOptions();
default_options.allow_concurrent_memtable_write =
persist_udt ? true : false;
DestroyAndReopen(default_options);
CreateColumnFamilies(cfs, ts_options);
return ReopenColumnFamiliesWithTs(cfs, ts_options, persist_udt,
avoid_flush_during_recovery);
}
Status ReopenColumnFamiliesWithTs(const std::vector<std::string>& cfs,
Options ts_options, bool persist_udt,
bool avoid_flush_during_recovery = false) {
Options default_options = CurrentOptions();
default_options.create_if_missing = false;
default_options.allow_concurrent_memtable_write =
persist_udt ? true : false;
default_options.avoid_flush_during_recovery = avoid_flush_during_recovery;
ts_options.create_if_missing = false;
std::vector<Options> cf_options(cfs.size(), ts_options);
std::vector<std::string> cfs_plus_default = cfs;
cfs_plus_default.insert(cfs_plus_default.begin(), kDefaultColumnFamilyName);
cf_options.insert(cf_options.begin(), default_options);
Close();
return TryReopenWithColumnFamilies(cfs_plus_default, cf_options);
}
Status Put(uint32_t cf, const Slice& key, const Slice& ts,
const Slice& value) {
WriteOptions write_opts;
return db_->Put(write_opts, handles_[cf], key, ts, value);
}
void CheckGet(const ReadOptions& read_opts, uint32_t cf, const Slice& key,
const std::string& expected_value,
const std::string& expected_ts) {
std::string actual_value;
std::string actual_ts;
ASSERT_OK(
db_->Get(read_opts, handles_[cf], key, &actual_value, &actual_ts));
ASSERT_EQ(expected_value, actual_value);
ASSERT_EQ(expected_ts, actual_ts);
}
};
TEST_P(DBWALTestWithTimestamp, RecoverAndNoFlush) {
std::string ts1;
PutFixed64(&ts1, 1);
Options ts_options;
ts_options.create_if_missing = true;
ts_options.comparator = test::BytewiseComparatorWithU64TsWrapper();
bool persist_udt = test::ShouldPersistUDT(GetParam());
ts_options.persist_user_defined_timestamps = persist_udt;
bool avoid_flush_during_recovery = true;
std::string full_history_ts_low;
ReadOptions read_opts;
do {
Slice ts_slice = ts1;
read_opts.timestamp = &ts_slice;
ASSERT_OK(CreateAndReopenWithTs({"pikachu"}, ts_options, persist_udt,
avoid_flush_during_recovery));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_.get(), "pikachu"), 0U);
ASSERT_OK(Put(1, "foo", ts1, "v1"));
ASSERT_OK(Put(1, "baz", ts1, "v5"));
ASSERT_OK(ReopenColumnFamiliesWithTs({"pikachu"}, ts_options, persist_udt,
avoid_flush_during_recovery));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_.get(), "pikachu"), 0U);
CheckGet(read_opts, 1, "foo", "v1", ts1);
CheckGet(read_opts, 1, "baz", "v5", ts1);
std::string ts2;
PutFixed64(&ts2, 2);
ASSERT_OK(Put(1, "bar", ts2, "v2"));
ASSERT_OK(Put(1, "foo", ts2, "v3"));
ASSERT_OK(ReopenColumnFamiliesWithTs({"pikachu"}, ts_options, persist_udt,
avoid_flush_during_recovery));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_.get(), "pikachu"), 0U);
std::string ts3;
PutFixed64(&ts3, 3);
ASSERT_OK(Put(1, "foo", ts3, "v4"));
CheckGet(read_opts, 1, "foo", "v1", ts1);
std::string value;
ASSERT_TRUE(db_->Get(read_opts, handles_[1], "bar", &value).IsNotFound());
CheckGet(read_opts, 1, "baz", "v5", ts1);
ts_slice = ts2;
CheckGet(read_opts, 1, "foo", "v3", ts2);
CheckGet(read_opts, 1, "bar", "v2", ts2);
CheckGet(read_opts, 1, "baz", "v5", ts1);
ts_slice = ts3;
CheckGet(read_opts, 1, "foo", "v4", ts3);
CheckGet(read_opts, 1, "bar", "v2", ts2);
CheckGet(read_opts, 1, "baz", "v5", ts1);
ASSERT_OK(db_->GetFullHistoryTsLow(handles_[1], &full_history_ts_low));
ASSERT_TRUE(full_history_ts_low.empty());
} while (ChangeWalOptions());
}
TEST_P(DBWALTestWithTimestamp, RecoverAndFlush) {
std::string min_ts;
std::string write_ts;
PutFixed64(&min_ts, 0);
PutFixed64(&write_ts, 1);
Options ts_options;
ts_options.create_if_missing = true;
ts_options.comparator = test::BytewiseComparatorWithU64TsWrapper();
bool persist_udt = test::ShouldPersistUDT(GetParam());
ts_options.persist_user_defined_timestamps = persist_udt;
std::string smallest_ukey_without_ts = "baz";
std::string largest_ukey_without_ts = "foo";
ASSERT_OK(CreateAndReopenWithTs({"pikachu"}, ts_options, persist_udt));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_.get(), "pikachu"), 0U);
ASSERT_OK(Put(1, largest_ukey_without_ts, write_ts, "v1"));
ASSERT_OK(Put(1, smallest_ukey_without_ts, write_ts, "v5"));
ASSERT_OK(ReopenColumnFamiliesWithTs({"pikachu"}, ts_options, persist_udt));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_.get(), "pikachu"), 1U);
std::vector<std::vector<FileMetaData>> level_to_files;
dbfull()->TEST_GetFilesMetaData(handles_[1], &level_to_files);
std::string full_history_ts_low;
ASSERT_OK(db_->GetFullHistoryTsLow(handles_[1], &full_history_ts_low));
ASSERT_GT(level_to_files.size(), 1);
ASSERT_EQ(level_to_files[0].size(), 1);
auto meta = level_to_files[0][0];
if (persist_udt) {
ASSERT_EQ(smallest_ukey_without_ts + write_ts, meta.smallest.user_key());
ASSERT_EQ(largest_ukey_without_ts + write_ts, meta.largest.user_key());
ASSERT_TRUE(full_history_ts_low.empty());
} else {
ASSERT_EQ(smallest_ukey_without_ts + min_ts, meta.smallest.user_key());
ASSERT_EQ(largest_ukey_without_ts + min_ts, meta.largest.user_key());
std::string effective_cutoff;
Slice write_ts_slice = write_ts;
GetFullHistoryTsLowFromU64CutoffTs(&write_ts_slice, &effective_cutoff);
ASSERT_EQ(effective_cutoff, full_history_ts_low);
}
}
INSTANTIATE_TEST_CASE_P(
P, DBWALTestWithTimestamp,
::testing::Values(
test::UserDefinedTimestampTestMode::kStripUserDefinedTimestamp,
test::UserDefinedTimestampTestMode::kNormal));
TEST_F(DBWALTestWithTimestamp, EnableDisableUDT) {
Options options;
options.create_if_missing = true;
options.comparator = BytewiseComparator();
bool avoid_flush_during_recovery = true;
ASSERT_OK(CreateAndReopenWithTs({"pikachu"}, options, true ,
avoid_flush_during_recovery));
ASSERT_OK(db_->Put(WriteOptions(), handles_[1], "foo", "v1"));
ASSERT_OK(db_->Put(WriteOptions(), handles_[1], "baz", "v5"));
options.comparator = test::BytewiseComparatorWithU64TsWrapper();
options.persist_user_defined_timestamps = false;
ASSERT_OK(ReopenColumnFamiliesWithTs({"pikachu"}, options,
false ,
avoid_flush_during_recovery));
std::string ts;
PutFixed64(&ts, 0);
Slice ts_slice = ts;
ReadOptions read_opts;
read_opts.timestamp = &ts_slice;
CheckGet(read_opts, 1, "foo", "v1", ts);
CheckGet(read_opts, 1, "baz", "v5", ts);
ts.clear();
PutFixed64(&ts, 1);
ASSERT_OK(db_->Put(WriteOptions(), handles_[1], "foo", ts, "v2"));
ASSERT_OK(db_->Put(WriteOptions(), handles_[1], "baz", ts, "v6"));
CheckGet(read_opts, 1, "foo", "v2", ts);
CheckGet(read_opts, 1, "baz", "v6", ts);
options.comparator = BytewiseComparator();
ASSERT_OK(ReopenColumnFamiliesWithTs({"pikachu"}, options,
true ,
avoid_flush_during_recovery));
ASSERT_EQ("v2", Get(1, "foo"));
ASSERT_EQ("v6", Get(1, "baz"));
}
TEST_F(DBWALTest, RecoverWithTableHandle) {
do {
Options options = CurrentOptions();
options.create_if_missing = true;
options.disable_auto_compactions = true;
options.avoid_flush_during_recovery = false;
DestroyAndReopen(options);
CreateAndReopenWithCF({"pikachu"}, options);
ASSERT_OK(Put(1, "foo", "v1"));
ASSERT_OK(Put(1, "bar", "v2"));
ASSERT_OK(Flush(1));
ASSERT_OK(Put(1, "foo", "v3"));
ASSERT_OK(Put(1, "bar", "v4"));
ASSERT_OK(Flush(1));
ASSERT_OK(Put(1, "big", std::string(100, 'a')));
options = CurrentOptions();
const int kSmallMaxOpenFiles = 13;
if (option_config_ == kDBLogDir) {
options.max_open_files = kSmallMaxOpenFiles;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"SanitizeOptions::AfterChangeMaxOpenFiles", [&](void* arg) {
int* max_open_files = static_cast<int*>(arg);
*max_open_files = kSmallMaxOpenFiles;
});
} else if (option_config_ == kWalDirAndMmapReads) {
options.max_open_files = 100;
} else {
options.max_open_files = -1;
}
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
ReopenWithColumnFamilies({"default", "pikachu"}, options);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
std::vector<std::vector<FileMetaData>> files;
dbfull()->TEST_GetFilesMetaData(handles_[1], &files);
size_t total_files = 0;
for (const auto& level : files) {
total_files += level.size();
}
ASSERT_EQ(total_files, 3);
for (const auto& level : files) {
for (const auto& file : level) {
if (options.max_open_files == kSmallMaxOpenFiles) {
ASSERT_TRUE(file.table_reader_handle == nullptr);
} else {
ASSERT_TRUE(file.table_reader_handle != nullptr);
}
}
}
} while (ChangeWalOptions());
}
TEST_F(DBWALTest, RecoverWithBlob) {
constexpr uint64_t min_blob_size = 10;
constexpr char short_value[] = "short";
static_assert(sizeof(short_value) - 1 < min_blob_size,
"short_value too long");
constexpr char long_value[] = "long_value";
static_assert(sizeof(long_value) - 1 >= min_blob_size,
"long_value too short");
ASSERT_OK(Put("key1", short_value));
ASSERT_OK(Put("key2", long_value));
{
VersionSet* const versions = dbfull()->GetVersionSet();
ASSERT_NE(versions, nullptr);
ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
ASSERT_NE(cfd, nullptr);
Version* const current = cfd->current();
ASSERT_NE(current, nullptr);
const VersionStorageInfo* const storage_info = current->storage_info();
ASSERT_NE(storage_info, nullptr);
ASSERT_EQ(storage_info->num_non_empty_levels(), 0);
ASSERT_TRUE(storage_info->GetBlobFiles().empty());
}
Options options;
options.enable_blob_files = true;
options.min_blob_size = min_blob_size;
options.avoid_flush_during_recovery = false;
options.disable_auto_compactions = true;
options.env = env_;
Reopen(options);
ASSERT_EQ(Get("key1"), short_value);
ASSERT_EQ(Get("key2"), long_value);
VersionSet* const versions = dbfull()->GetVersionSet();
ASSERT_NE(versions, nullptr);
ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
ASSERT_NE(cfd, nullptr);
Version* const current = cfd->current();
ASSERT_NE(current, nullptr);
const VersionStorageInfo* const storage_info = current->storage_info();
ASSERT_NE(storage_info, nullptr);
const auto& l0_files = storage_info->LevelFiles(0);
ASSERT_EQ(l0_files.size(), 1);
const FileMetaData* const table_file = l0_files[0];
ASSERT_NE(table_file, nullptr);
const auto& blob_files = storage_info->GetBlobFiles();
ASSERT_EQ(blob_files.size(), 1);
const auto& blob_file = blob_files.front();
ASSERT_NE(blob_file, nullptr);
ASSERT_EQ(table_file->smallest.user_key(), "key1");
ASSERT_EQ(table_file->largest.user_key(), "key2");
ASSERT_EQ(table_file->fd.smallest_seqno, 1);
ASSERT_EQ(table_file->fd.largest_seqno, 2);
ASSERT_EQ(table_file->oldest_blob_file_number,
blob_file->GetBlobFileNumber());
ASSERT_EQ(blob_file->GetTotalBlobCount(), 1);
const InternalStats* const internal_stats = cfd->internal_stats();
ASSERT_NE(internal_stats, nullptr);
const auto& compaction_stats = internal_stats->TEST_GetCompactionStats();
ASSERT_FALSE(compaction_stats.empty());
ASSERT_EQ(compaction_stats[0].bytes_written, table_file->fd.GetFileSize());
ASSERT_EQ(compaction_stats[0].bytes_written_blob,
blob_file->GetTotalBlobBytes());
ASSERT_EQ(compaction_stats[0].num_output_files, 1);
ASSERT_EQ(compaction_stats[0].num_output_files_blob, 1);
const uint64_t* const cf_stats_value = internal_stats->TEST_GetCFStatsValue();
ASSERT_EQ(cf_stats_value[InternalStats::BYTES_FLUSHED],
compaction_stats[0].bytes_written +
compaction_stats[0].bytes_written_blob);
}
TEST_F(DBWALTest, RecoverWithBlobMultiSST) {
std::string large_value(1 << 12, 'a');
constexpr int num_keys = 64;
for (int i = 0; i < num_keys; ++i) {
ASSERT_OK(Put(Key(i), large_value));
}
{
VersionSet* const versions = dbfull()->GetVersionSet();
ASSERT_NE(versions, nullptr);
ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
ASSERT_NE(cfd, nullptr);
Version* const current = cfd->current();
ASSERT_NE(current, nullptr);
const VersionStorageInfo* const storage_info = current->storage_info();
ASSERT_NE(storage_info, nullptr);
ASSERT_EQ(storage_info->num_non_empty_levels(), 0);
ASSERT_TRUE(storage_info->GetBlobFiles().empty());
}
Options options;
options.write_buffer_size = 1 << 16; options.enable_blob_files = true;
options.avoid_flush_during_recovery = false;
options.disable_auto_compactions = true;
options.env = env_;
Reopen(options);
for (int i = 0; i < num_keys; ++i) {
ASSERT_EQ(Get(Key(i)), large_value);
}
VersionSet* const versions = dbfull()->GetVersionSet();
ASSERT_NE(versions, nullptr);
ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
ASSERT_NE(cfd, nullptr);
Version* const current = cfd->current();
ASSERT_NE(current, nullptr);
const VersionStorageInfo* const storage_info = current->storage_info();
ASSERT_NE(storage_info, nullptr);
const auto& l0_files = storage_info->LevelFiles(0);
ASSERT_GT(l0_files.size(), 1);
const auto& blob_files = storage_info->GetBlobFiles();
ASSERT_GT(blob_files.size(), 1);
ASSERT_EQ(l0_files.size(), blob_files.size());
}
TEST_F(DBWALTest, WALWithChecksumHandoff) {
#ifndef ROCKSDB_ASSERT_STATUS_CHECKED
if (mem_env_ || encrypted_env_) {
ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment");
return;
}
std::shared_ptr<FaultInjectionTestFS> fault_fs(
new FaultInjectionTestFS(FileSystem::Default()));
std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
do {
Options options = CurrentOptions();
options.checksum_handoff_file_types.Add(FileType::kWalFile);
options.env = fault_fs_env.get();
fault_fs->SetChecksumHandoffFuncType(ChecksumType::kCRC32c);
CreateAndReopenWithCF({"pikachu"}, options);
WriteOptions writeOpt = WriteOptions();
writeOpt.disableWAL = true;
ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v1"));
ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v1"));
ReopenWithColumnFamilies({"default", "pikachu"}, options);
ASSERT_EQ("v1", Get(1, "foo"));
ASSERT_EQ("v1", Get(1, "bar"));
writeOpt.disableWAL = false;
ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v2"));
writeOpt.disableWAL = true;
ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v2"));
ReopenWithColumnFamilies({"default", "pikachu"}, options);
ASSERT_EQ("v2", Get(1, "bar"));
ASSERT_EQ("v2", Get(1, "foo"));
writeOpt.disableWAL = true;
ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v3"));
ReopenWithColumnFamilies({"default", "pikachu"}, options);
writeOpt.disableWAL = false;
ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "zoo", "v3"));
ASSERT_OK(dbfull()->SyncWAL());
fault_fs->SetChecksumHandoffFuncType(ChecksumType::kxxHash);
writeOpt.disableWAL = false;
ASSERT_NOK(dbfull()->Put(writeOpt, handles_[1], "foo", "v3"));
ReopenWithColumnFamilies({"default", "pikachu"}, options);
ASSERT_NE("v3", Get(1, "foo"));
ASSERT_EQ("v3", Get(1, "zoo"));
ASSERT_EQ("v3", Get(1, "bar"));
fault_fs->SetChecksumHandoffFuncType(ChecksumType::kCRC32c);
fault_fs->IngestDataCorruptionBeforeWrite();
writeOpt.disableWAL = true;
ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v4"));
writeOpt.disableWAL = false;
ASSERT_NOK(dbfull()->Put(writeOpt, handles_[1], "foo", "v4"));
ReopenWithColumnFamilies({"default", "pikachu"}, options);
ASSERT_NE("v4", Get(1, "foo"));
ASSERT_NE("v4", Get(1, "bar"));
fault_fs->NoDataCorruptionBeforeWrite();
fault_fs->SetChecksumHandoffFuncType(ChecksumType::kNoChecksum);
writeOpt.disableWAL = true;
ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v5"));
writeOpt.disableWAL = false;
ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v5"));
ReopenWithColumnFamilies({"default", "pikachu"}, options);
ASSERT_EQ("v5", Get(1, "foo"));
ASSERT_EQ("v5", Get(1, "bar"));
Destroy(options);
} while (ChangeWalOptions());
#endif }
TEST_F(DBWALTest, LockWal) {
do {
Options options = CurrentOptions();
options.create_if_missing = true;
DestroyAndReopen(options);
ASSERT_OK(Put("foo", "v"));
ASSERT_OK(Put("bar", "v"));
ASSERT_OK(db_->LockWAL());
WriteOptions wopts;
wopts.no_slowdown = true;
Status s = db_->Put(wopts, "foo", "dontcare");
ASSERT_TRUE(s.IsIncomplete());
{
VectorLogPtr wals;
ASSERT_OK(db_->GetSortedWalFiles(wals));
ASSERT_FALSE(wals.empty());
}
port::Thread worker([&]() {
Status tmp_s = db_->Flush(FlushOptions());
ASSERT_OK(tmp_s);
});
FlushOptions flush_opts;
flush_opts.wait = false;
s = db_->Flush(flush_opts);
ASSERT_TRUE(s.IsTryAgain());
ASSERT_OK(db_->UnlockWAL());
ASSERT_OK(db_->Put(WriteOptions(), "foo", "dontcare"));
worker.join();
} while (ChangeWalOptions());
}
class DBRecoveryTestBlobError
: public DBWALTest,
public testing::WithParamInterface<std::string> {
public:
DBRecoveryTestBlobError() : sync_point_(GetParam()) {}
std::string sync_point_;
};
INSTANTIATE_TEST_CASE_P(DBRecoveryTestBlobError, DBRecoveryTestBlobError,
::testing::ValuesIn(std::vector<std::string>{
"BlobFileBuilder::WriteBlobToFile:AddRecord",
"BlobFileBuilder::WriteBlobToFile:AppendFooter"}));
TEST_P(DBRecoveryTestBlobError, RecoverWithBlobError) {
ASSERT_OK(Put("key", "blob"));
SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* arg) {
Status* const s = static_cast<Status*>(arg);
assert(s);
(*s) = Status::IOError(sync_point_);
});
SyncPoint::GetInstance()->EnableProcessing();
Options options;
options.enable_blob_files = true;
options.avoid_flush_during_recovery = false;
options.disable_auto_compactions = true;
options.env = env_;
ASSERT_NOK(TryReopen(options));
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
std::vector<std::string> files;
ASSERT_OK(env_->GetChildren(dbname_, &files));
for (const auto& file : files) {
uint64_t number = 0;
FileType type = kTableFile;
if (!ParseFileName(file, &number, &type)) {
continue;
}
ASSERT_NE(type, kTableFile);
ASSERT_NE(type, kBlobFile);
}
}
TEST_F(DBWALTest, IgnoreRecoveredLog) {
std::string backup_logs = dbname_ + "/backup_logs";
do {
ASSERT_OK(env_->CreateDirIfMissing(backup_logs));
std::vector<std::string> old_files;
ASSERT_OK(env_->GetChildren(backup_logs, &old_files));
for (auto& file : old_files) {
ASSERT_OK(env_->DeleteFile(backup_logs + "/" + file));
}
Options options = CurrentOptions();
options.create_if_missing = true;
options.merge_operator = MergeOperators::CreateUInt64AddOperator();
options.wal_dir = dbname_ + "/logs";
DestroyAndReopen(options);
std::string one, two;
PutFixed64(&one, 1);
PutFixed64(&two, 2);
ASSERT_OK(db_->Merge(WriteOptions(), Slice("foo"), Slice(one)));
ASSERT_OK(db_->Merge(WriteOptions(), Slice("foo"), Slice(one)));
ASSERT_OK(db_->Merge(WriteOptions(), Slice("bar"), Slice(one)));
std::vector<std::string> logs;
ASSERT_OK(env_->GetChildren(options.wal_dir, &logs));
for (auto& log : logs) {
CopyFile(options.wal_dir + "/" + log, backup_logs + "/" + log);
}
Reopen(options);
ASSERT_EQ(two, Get("foo"));
ASSERT_EQ(one, Get("bar"));
Close();
for (auto& log : logs) {
CopyFile(backup_logs + "/" + log, options.wal_dir + "/" + log);
}
Reopen(options);
ASSERT_EQ(two, Get("foo"));
ASSERT_EQ(one, Get("bar"));
Close();
Destroy(options);
Reopen(options);
Close();
ASSERT_OK(env_->CreateDirIfMissing(options.wal_dir));
for (auto& log : logs) {
CopyFile(backup_logs + "/" + log, options.wal_dir + "/" + log);
}
Reopen(options);
ASSERT_EQ(two, Get("foo"));
ASSERT_EQ(one, Get("bar"));
Destroy(options);
ASSERT_OK(env_->CreateDirIfMissing(options.wal_dir));
for (auto& log : logs) {
CopyFile(backup_logs + "/" + log, options.wal_dir + "/" + log);
ASSERT_OK(env_->DeleteFile(backup_logs + "/" + log));
}
Status s = TryReopen(options);
ASSERT_NOK(s);
Destroy(options);
} while (ChangeWalOptions());
}
TEST_F(DBWALTest, RecoveryWithEmptyLog) {
do {
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
ASSERT_OK(Put(1, "foo", "v1"));
ASSERT_OK(Put(1, "foo", "v2"));
ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
ASSERT_OK(Put(1, "foo", "v3"));
ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
ASSERT_EQ("v3", Get(1, "foo"));
} while (ChangeWalOptions());
}
#if !(defined NDEBUG) || !defined(OS_WIN)
TEST_F(DBWALTest, PreallocateBlock) {
Options options = CurrentOptions();
options.write_buffer_size = 10 * 1000 * 1000;
options.max_total_wal_size = 0;
size_t expected_preallocation_size = static_cast<size_t>(
options.write_buffer_size + options.write_buffer_size / 10);
DestroyAndReopen(options);
std::atomic<int> called(0);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBTestWalFile.GetPreallocationStatus", [&](void* arg) {
ASSERT_TRUE(arg != nullptr);
size_t preallocation_size = *(static_cast<size_t*>(arg));
ASSERT_EQ(expected_preallocation_size, preallocation_size);
called.fetch_add(1);
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(Put("", ""));
ASSERT_OK(Flush());
ASSERT_OK(Put("", ""));
Close();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ASSERT_EQ(2, called.load());
options.max_total_wal_size = 1000 * 1000;
expected_preallocation_size = static_cast<size_t>(options.max_total_wal_size);
Reopen(options);
called.store(0);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBTestWalFile.GetPreallocationStatus", [&](void* arg) {
ASSERT_TRUE(arg != nullptr);
size_t preallocation_size = *(static_cast<size_t*>(arg));
ASSERT_EQ(expected_preallocation_size, preallocation_size);
called.fetch_add(1);
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(Put("", ""));
ASSERT_OK(Flush());
ASSERT_OK(Put("", ""));
Close();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ASSERT_EQ(2, called.load());
options.db_write_buffer_size = 800 * 1000;
expected_preallocation_size =
static_cast<size_t>(options.db_write_buffer_size);
Reopen(options);
called.store(0);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBTestWalFile.GetPreallocationStatus", [&](void* arg) {
ASSERT_TRUE(arg != nullptr);
size_t preallocation_size = *(static_cast<size_t*>(arg));
ASSERT_EQ(expected_preallocation_size, preallocation_size);
called.fetch_add(1);
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(Put("", ""));
ASSERT_OK(Flush());
ASSERT_OK(Put("", ""));
Close();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ASSERT_EQ(2, called.load());
expected_preallocation_size = 700 * 1000;
std::shared_ptr<WriteBufferManager> write_buffer_manager =
std::make_shared<WriteBufferManager>(static_cast<uint64_t>(700 * 1000));
options.write_buffer_manager = write_buffer_manager;
Reopen(options);
called.store(0);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBTestWalFile.GetPreallocationStatus", [&](void* arg) {
ASSERT_TRUE(arg != nullptr);
size_t preallocation_size = *(static_cast<size_t*>(arg));
ASSERT_EQ(expected_preallocation_size, preallocation_size);
called.fetch_add(1);
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(Put("", ""));
ASSERT_OK(Flush());
ASSERT_OK(Put("", ""));
Close();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ASSERT_EQ(2, called.load());
}
#endif
TEST_F(DBWALTest, FullPurgePreservesRecycledLog) {
for (int i = 0; i < 2; ++i) {
Options options = CurrentOptions();
options.create_if_missing = true;
options.recycle_log_file_num = 2;
options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
if (i != 0) {
options.wal_dir = alternative_wal_dir_;
}
DestroyAndReopen(options);
ASSERT_OK(Put("foo", "v1"));
VectorLogPtr log_files;
ASSERT_OK(dbfull()->GetSortedWalFiles(log_files));
ASSERT_GT(log_files.size(), 0);
ASSERT_OK(Flush());
JobContext job_context(0);
dbfull()->TEST_LockMutex();
dbfull()->FindObsoleteFiles(&job_context, true );
dbfull()->TEST_UnlockMutex();
dbfull()->PurgeObsoleteFiles(job_context);
job_context.Clean();
if (i == 0) {
ASSERT_OK(
env_->FileExists(LogFileName(dbname_, log_files[0]->LogNumber())));
} else {
ASSERT_OK(env_->FileExists(
LogFileName(alternative_wal_dir_, log_files[0]->LogNumber())));
}
}
}
TEST_F(DBWALTest, FullPurgePreservesLogPendingReuse) {
for (int i = 0; i < 2; ++i) {
Options options = CurrentOptions();
options.recycle_log_file_num = 1;
options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
if (i != 0) {
options.wal_dir = alternative_wal_dir_;
}
DestroyAndReopen(options);
ASSERT_OK(Put("foo", "bar"));
ASSERT_OK(Flush());
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
{"DBImpl::CreateWAL:BeforeReuseWritableFile1",
"DBWALTest::FullPurgePreservesLogPendingReuse:PreFullPurge"},
{"DBWALTest::FullPurgePreservesLogPendingReuse:PostFullPurge",
"DBImpl::CreateWAL:BeforeReuseWritableFile2"},
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
ROCKSDB_NAMESPACE::port::Thread thread([&]() {
TEST_SYNC_POINT(
"DBWALTest::FullPurgePreservesLogPendingReuse:PreFullPurge");
ASSERT_OK(db_->EnableFileDeletions());
TEST_SYNC_POINT(
"DBWALTest::FullPurgePreservesLogPendingReuse:PostFullPurge");
});
ASSERT_OK(Put("foo", "bar"));
ASSERT_OK(Flush());
thread.join();
}
}
TEST_F(DBWALTest, GetSortedWalFiles) {
do {
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
VectorLogPtr log_files;
ASSERT_OK(dbfull()->GetSortedWalFiles(log_files));
ASSERT_EQ(0, log_files.size());
ASSERT_OK(Put(1, "foo", "v1"));
ASSERT_OK(dbfull()->GetSortedWalFiles(log_files));
ASSERT_EQ(1, log_files.size());
} while (ChangeWalOptions());
}
TEST_F(DBWALTest, GetCurrentWalFile) {
do {
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
std::unique_ptr<LogFile>* bad_log_file = nullptr;
ASSERT_NOK(dbfull()->GetCurrentWalFile(bad_log_file));
std::unique_ptr<LogFile> log_file;
ASSERT_OK(dbfull()->GetCurrentWalFile(&log_file));
ASSERT_EQ(log_file->StartSequence(), 0);
ASSERT_EQ(log_file->SizeFileBytes(), 0);
ASSERT_EQ(log_file->Type(), kAliveLogFile);
ASSERT_GT(log_file->LogNumber(), 0);
ASSERT_OK(Put(0, "foo", "v1"));
ASSERT_OK(Put(0, "foo2", "v2"));
ASSERT_OK(Put(0, "foo3", "v3"));
ASSERT_OK(dbfull()->GetCurrentWalFile(&log_file));
ASSERT_EQ(log_file->StartSequence(), 0);
ASSERT_GT(log_file->SizeFileBytes(), 0);
ASSERT_EQ(log_file->Type(), kAliveLogFile);
ASSERT_GT(log_file->LogNumber(), 0);
ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
for (int i = 0; i < 10; i++) {
ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
}
ASSERT_OK(Put(0, "foo4", "v4"));
ASSERT_OK(Put(0, "foo5", "v5"));
ASSERT_OK(Put(0, "foo6", "v6"));
ASSERT_OK(dbfull()->GetCurrentWalFile(&log_file));
ASSERT_EQ(log_file->StartSequence(), 0);
ASSERT_GT(log_file->SizeFileBytes(), 0);
ASSERT_EQ(log_file->Type(), kAliveLogFile);
ASSERT_GT(log_file->LogNumber(), 0);
} while (ChangeWalOptions());
}
TEST_F(DBWALTest, RecoveryWithLogDataForSomeCFs) {
do {
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
ASSERT_OK(Put(1, "foo", "v1"));
ASSERT_OK(Put(1, "foo", "v2"));
uint64_t earliest_log_nums[2];
for (int i = 0; i < 2; ++i) {
if (i > 0) {
ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
}
VectorLogPtr log_files;
ASSERT_OK(dbfull()->GetSortedWalFiles(log_files));
if (log_files.size() > 0) {
earliest_log_nums[i] = log_files[0]->LogNumber();
} else {
earliest_log_nums[i] = std::numeric_limits<uint64_t>::max();
}
}
ASSERT_LT(earliest_log_nums[0], earliest_log_nums[1]);
} while (ChangeWalOptions());
}
TEST_F(DBWALTest, RecoverWithLargeLog) {
do {
{
Options options = CurrentOptions();
CreateAndReopenWithCF({"pikachu"}, options);
ASSERT_OK(Put(1, "big1", std::string(200000, '1')));
ASSERT_OK(Put(1, "big2", std::string(200000, '2')));
ASSERT_OK(Put(1, "small3", std::string(10, '3')));
ASSERT_OK(Put(1, "small4", std::string(10, '4')));
ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
}
Options options;
options.write_buffer_size = 100000;
options = CurrentOptions(options);
ReopenWithColumnFamilies({"default", "pikachu"}, options);
ASSERT_EQ(NumTableFilesAtLevel(0, 1), 3);
ASSERT_EQ(std::string(200000, '1'), Get(1, "big1"));
ASSERT_EQ(std::string(200000, '2'), Get(1, "big2"));
ASSERT_EQ(std::string(10, '3'), Get(1, "small3"));
ASSERT_EQ(std::string(10, '4'), Get(1, "small4"));
ASSERT_GT(NumTableFilesAtLevel(0, 1), 1);
} while (ChangeWalOptions());
}
TEST_F(DBWALTest, RecoverCheckFileAmountWithSmallWriteBuffer) {
Options options = CurrentOptions();
options.write_buffer_size = 5000000;
CreateAndReopenWithCF({"pikachu", "dobrynia", "nikitich"}, options);
ASSERT_OK(Put(1, Key(10), DummyString(1000000)));
ASSERT_OK(Put(1, Key(10), DummyString(1000000)));
ASSERT_OK(Put(1, Key(10), DummyString(1000000)));
ASSERT_OK(Put(1, Key(10), DummyString(1000000)));
ASSERT_OK(Put(3, Key(10), DummyString(1)));
ASSERT_OK(Put(2, Key(10), DummyString(7500000)));
ASSERT_OK(Put(2, Key(1), DummyString(1)));
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[2]));
{
auto tables = ListTableFiles(env_, dbname_);
ASSERT_EQ(tables.size(), static_cast<size_t>(1));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_.get(), "dobrynia"),
static_cast<uint64_t>(1));
}
ASSERT_OK(Put(1, Key(1), DummyString(1)));
ASSERT_OK(Put(1, Key(1), DummyString(1)));
ASSERT_OK(Put(3, Key(10), DummyString(1)));
ASSERT_OK(Put(3, Key(10), DummyString(1)));
ASSERT_OK(Put(3, Key(10), DummyString(1)));
options.write_buffer_size = 4096;
options.arena_block_size = 4096;
ReopenWithColumnFamilies({"default", "pikachu", "dobrynia", "nikitich"},
options);
{
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_.get(), "default"),
static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_.get(), "pikachu"),
static_cast<uint64_t>(5));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_.get(), "dobrynia"),
static_cast<uint64_t>(2));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_.get(), "nikitich"),
static_cast<uint64_t>(1));
}
}
TEST_F(DBWALTest, RecoverCheckFileAmount) {
Options options = CurrentOptions();
options.write_buffer_size = 100000;
options.arena_block_size = 4 * 1024;
options.avoid_flush_during_recovery = false;
CreateAndReopenWithCF({"pikachu", "dobrynia", "nikitich"}, options);
ASSERT_OK(Put(0, Key(1), DummyString(1)));
ASSERT_OK(Put(1, Key(1), DummyString(1)));
ASSERT_OK(Put(2, Key(1), DummyString(1)));
ASSERT_OK(Put(3, Key(10), DummyString(1002400)));
ASSERT_OK(Put(3, Key(1), DummyString(1)));
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[3]));
{
auto tables = ListTableFiles(env_, dbname_);
ASSERT_EQ(tables.size(), static_cast<size_t>(1));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_.get(), "nikitich"),
static_cast<uint64_t>(1));
}
ASSERT_OK(Put(0, Key(1), DummyString(1)));
ASSERT_OK(Put(1, Key(1), DummyString(1)));
ASSERT_OK(Put(2, Key(1), DummyString(1)));
ASSERT_OK(Put(3, Key(10), DummyString(1002400)));
ASSERT_OK(Put(3, Key(1), DummyString(1)));
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[3]));
ASSERT_OK(Put(0, Key(1), DummyString(1)));
ASSERT_OK(Put(1, Key(1), DummyString(1)));
ASSERT_OK(Put(2, Key(1), DummyString(1)));
{
auto tables = ListTableFiles(env_, dbname_);
ASSERT_EQ(tables.size(), static_cast<size_t>(2));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_.get(), "nikitich"),
static_cast<uint64_t>(2));
}
ReopenWithColumnFamilies({"default", "pikachu", "dobrynia", "nikitich"},
options);
{
std::vector<uint64_t> table_files = ListTableFiles(env_, dbname_);
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_.get(), "default"),
static_cast<uint64_t>(1));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_.get(), "nikitich"),
static_cast<uint64_t>(3));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_.get(), "dobrynia"),
static_cast<uint64_t>(1));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_.get(), "pikachu"),
static_cast<uint64_t>(1));
}
}
TEST_F(DBWALTest, SyncMultipleLogs) {
const uint64_t kNumBatches = 2;
const int kBatchSize = 1000;
Options options = CurrentOptions();
options.create_if_missing = true;
options.write_buffer_size = 4096;
Reopen(options);
WriteBatch batch;
WriteOptions wo;
wo.sync = true;
for (uint64_t b = 0; b < kNumBatches; b++) {
batch.Clear();
for (int i = 0; i < kBatchSize; i++) {
ASSERT_OK(batch.Put(Key(i), DummyString(128)));
}
ASSERT_OK(dbfull()->Write(wo, &batch));
}
ASSERT_OK(dbfull()->SyncWAL());
}
TEST_F(DBWALTest, DISABLED_RecycleMultipleWalsCrash) {
Options options = CurrentOptions();
options.max_write_buffer_number = 5;
options.track_and_verify_wals_in_manifest = true;
options.max_bgerror_resume_count = 0; options.recycle_log_file_num = 3;
options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
SyncPoint::GetInstance()->SetCallBack(
"PosixWritableFile::Close",
[](void* arg) { *(static_cast<size_t*>(arg)) = 0; });
SyncPoint::GetInstance()->EnableProcessing();
DestroyAndReopen(options);
Defer closer([this]() { Close(); });
ASSERT_EQ(db_->GetOptions().recycle_log_file_num,
options.recycle_log_file_num);
std::string sst_files_dir = dbname_ + "/sst_files/";
ASSERT_OK(DestroyDir(env_, sst_files_dir));
ASSERT_OK(env_->CreateDir(sst_files_dir));
std::string external_file1 = sst_files_dir + "file1.sst";
{
SstFileWriter sst_file_writer(EnvOptions(), options);
ASSERT_OK(sst_file_writer.Open(external_file1));
ASSERT_OK(sst_file_writer.Put("external1", "ex1"));
ExternalSstFileInfo file_info;
ASSERT_OK(sst_file_writer.Finish(&file_info));
}
std::string external_file2 = sst_files_dir + "file2.sst";
{
SstFileWriter sst_file_writer(EnvOptions(), options);
ASSERT_OK(sst_file_writer.Open(external_file2));
ASSERT_OK(sst_file_writer.Put("external2", "ex2"));
ExternalSstFileInfo file_info;
ASSERT_OK(sst_file_writer.Finish(&file_info));
}
ASSERT_OK(db_->PauseBackgroundWork());
ASSERT_OK(Put("ignore1", Random::GetTLSInstance()->RandomString(500)));
ASSERT_OK(dbfull()->TEST_SwitchMemtable());
ASSERT_OK(Put("ignore2", Random::GetTLSInstance()->RandomString(500)));
ASSERT_OK(dbfull()->TEST_SwitchMemtable());
ASSERT_OK(db_->ContinueBackgroundWork());
ASSERT_OK(Flush());
ASSERT_OK(Put("ignore3", Random::GetTLSInstance()->RandomString(500)));
ASSERT_OK(Flush());
std::vector<FileAttributes> files;
int log_count = 0;
ASSERT_OK(options.env->GetChildrenFileAttributes(dbname_, &files));
for (const auto& f : files) {
if (EndsWith(f.name, ".log")) {
EXPECT_GT(f.size_bytes, 500);
++log_count;
}
}
EXPECT_EQ(log_count, 3);
ASSERT_OK(db_->PauseBackgroundWork());
ASSERT_OK(Put("key1", "val1"));
ASSERT_OK(dbfull()->TEST_SwitchMemtable());
ASSERT_OK(Put("key2", "val2"));
ASSERT_OK(dbfull()->TEST_SwitchMemtable());
{
ManagedSnapshot snapshot(db_.get());
ASSERT_OK(
db_->IngestExternalFile({external_file1}, IngestExternalFileOptions()));
}
ASSERT_OK(Put("key3", "val3"));
ASSERT_OK(db_->SyncWAL());
{
ManagedSnapshot snapshot(db_.get());
ASSERT_OK(
db_->IngestExternalFile({external_file2}, IngestExternalFileOptions()));
}
Close();
files.clear();
log_count = 0;
ASSERT_OK(options.env->GetChildrenFileAttributes(dbname_, &files));
for (const auto& f : files) {
if (EndsWith(f.name, ".log")) {
EXPECT_GT(f.size_bytes, 500);
++log_count;
}
}
EXPECT_EQ(log_count, 3);
Reopen(options);
EXPECT_EQ("val1", Get("key1"));
EXPECT_EQ("val2", Get("key2")); EXPECT_EQ("ex1", Get("external1"));
EXPECT_EQ("val3", Get("key3")); EXPECT_EQ("ex2", Get("external2"));
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}
TEST_F(DBWALTest, SyncWalPartialFailure) {
class MyTestFileSystem : public FileSystemWrapper {
public:
explicit MyTestFileSystem(std::shared_ptr<FileSystem> base)
: FileSystemWrapper(std::move(base)) {}
static const char* kClassName() { return "MyTestFileSystem"; }
const char* Name() const override { return kClassName(); }
IOStatus NewWritableFile(const std::string& fname,
const FileOptions& file_opts,
std::unique_ptr<FSWritableFile>* result,
IODebugContext* dbg) override {
IOStatus s = target()->NewWritableFile(fname, file_opts, result, dbg);
if (s.ok()) {
*result =
std::make_unique<MyTestWritableFile>(std::move(*result), *this);
}
return s;
}
Atomic<uint32_t> syncs_before_failure_{UINT32_MAX};
protected:
class MyTestWritableFile : public FSWritableFileOwnerWrapper {
public:
MyTestWritableFile(std::unique_ptr<FSWritableFile>&& file,
MyTestFileSystem& fs)
: FSWritableFileOwnerWrapper(std::move(file)), fs_(fs) {}
IOStatus Sync(const IOOptions& options, IODebugContext* dbg) override {
int prev_val = fs_.syncs_before_failure_.FetchSub(1);
if (prev_val == 0) {
return IOStatus::IOError("fault");
} else {
return target()->Sync(options, dbg);
}
}
protected:
MyTestFileSystem& fs_;
};
};
Options options = CurrentOptions();
options.max_write_buffer_number = 4;
options.track_and_verify_wals_in_manifest = true;
options.max_bgerror_resume_count = 0;
auto custom_fs =
std::make_shared<MyTestFileSystem>(options.env->GetFileSystem());
std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(custom_fs));
options.env = fault_fs_env.get();
Reopen(options);
Defer closer([this]() { Close(); });
ASSERT_OK(db_->PauseBackgroundWork());
ASSERT_OK(Put("key1", "val1"));
ASSERT_OK(dbfull()->TEST_SwitchMemtable());
ASSERT_OK(db_->SyncWAL());
ASSERT_OK(Put("key2", "val2"));
ASSERT_OK(dbfull()->TEST_SwitchMemtable());
ASSERT_OK(Put("key3", "val3"));
custom_fs->syncs_before_failure_.Store(1);
ASSERT_NOK(db_->SyncWAL());
ASSERT_NOK(db_->SyncWAL());
ASSERT_NOK(db_->Resume());
Reopen(options);
ASSERT_EQ("val1", Get("key1"));
ASSERT_EQ("val2", Get("key2"));
ASSERT_EQ("val3", Get("key3"));
}
TEST_F(DBWALTest, PartOfWritesWithWALDisabled) {
std::unique_ptr<FaultInjectionTestEnv> fault_env(
new FaultInjectionTestEnv(env_));
Options options = CurrentOptions();
options.env = fault_env.get();
options.disable_auto_compactions = true;
WriteOptions wal_on, wal_off;
wal_on.sync = true;
wal_on.disableWAL = false;
wal_off.disableWAL = true;
CreateAndReopenWithCF({"dummy"}, options);
ASSERT_OK(Put(1, "dummy", "d1", wal_on)); ASSERT_OK(Put(1, "dummy", "d2", wal_off));
ASSERT_OK(Put(1, "dummy", "d3", wal_off));
ASSERT_OK(Put(0, "key", "v4", wal_on)); ASSERT_OK(Flush(0));
ASSERT_OK(Put(0, "key", "v5", wal_on)); ASSERT_EQ("v5", Get(0, "key"));
ASSERT_OK(dbfull()->FlushWAL(false));
fault_env->SetFilesystemActive(false);
Close();
fault_env->ResetState();
ReopenWithColumnFamilies({"default", "dummy"}, options);
ASSERT_EQ("v5", Get(0, "key"));
Destroy(options);
}
class RecoveryTestHelper {
public:
static constexpr int kWALFilesCount = 10;
static constexpr int kWALFileOffset = 10;
static constexpr int kKeysPerWALFile = 133;
static constexpr int kValueSize = 96;
static void FillData(DBWALTestBase* test, const Options& options,
const size_t wal_count, size_t* count) {
Options sanitized_options = SanitizeOptions(test->dbname_, options);
const ImmutableDBOptions db_options(sanitized_options);
*count = 0;
std::shared_ptr<Cache> table_cache = NewLRUCache(50, 0);
FileOptions file_options;
WriteBufferManager write_buffer_manager(db_options.db_write_buffer_size);
std::unique_ptr<VersionSet> versions;
std::unique_ptr<WalManager> wal_manager;
WriteController write_controller;
versions.reset(new VersionSet(
test->dbname_, &db_options, MutableDBOptions{options}, file_options,
table_cache.get(), &write_buffer_manager, &write_controller,
nullptr,
nullptr, "", "",
options.daily_offpeak_time_utc,
nullptr, false));
wal_manager.reset(
new WalManager(db_options, file_options, nullptr));
std::unique_ptr<log::Writer> current_log_writer;
for (size_t j = kWALFileOffset; j < wal_count + kWALFileOffset; j++) {
uint64_t current_log_number = j;
std::string fname = LogFileName(test->dbname_, current_log_number);
std::unique_ptr<WritableFileWriter> file_writer;
ASSERT_OK(WritableFileWriter::Create(db_options.env->GetFileSystem(),
fname, file_options, &file_writer,
nullptr));
log::Writer* log_writer =
new log::Writer(std::move(file_writer), current_log_number,
db_options.recycle_log_file_num > 0, false,
db_options.wal_compression);
ASSERT_OK(log_writer->AddCompressionTypeRecord(WriteOptions()));
current_log_writer.reset(log_writer);
WriteBatch batch;
for (int i = 0; i < kKeysPerWALFile; i++) {
std::string key = "key" + std::to_string((*count)++);
std::string value = test->DummyString(kValueSize);
ASSERT_NE(current_log_writer.get(), nullptr);
uint64_t seq = versions->LastSequence() + 1;
batch.Clear();
ASSERT_OK(batch.Put(key, value));
WriteBatchInternal::SetSequence(&batch, seq);
ASSERT_OK(current_log_writer->AddRecord(
WriteOptions(), WriteBatchInternal::Contents(&batch)));
versions->SetLastAllocatedSequence(seq);
versions->SetLastPublishedSequence(seq);
versions->SetLastSequence(seq);
}
}
}
static size_t FillData(DBWALTestBase* test, Options* options) {
options->create_if_missing = true;
test->DestroyAndReopen(*options);
test->Close();
size_t count = 0;
FillData(test, *options, kWALFilesCount, &count);
return count;
}
static size_t GetData(DBWALTestBase* test) {
size_t count = 0;
for (size_t i = 0; i < kWALFilesCount * kKeysPerWALFile; i++) {
if (test->Get("key" + std::to_string(i)) != "NOT_FOUND") {
++count;
}
}
return count;
}
static void CorruptWAL(DBWALTestBase* test, const Options& options,
const double off, const double len,
const int wal_file_id, const bool trunc = false) {
Env* env = options.env;
std::string fname = LogFileName(test->dbname_, wal_file_id);
uint64_t size;
ASSERT_OK(env->GetFileSize(fname, &size));
ASSERT_GT(size, 0);
#ifdef OS_WIN
test->Close();
#endif
if (trunc) {
ASSERT_OK(
test::TruncateFile(env, fname, static_cast<uint64_t>(size * off)));
} else {
ASSERT_OK(test::CorruptFile(env, fname, static_cast<int>(size * off + 8),
static_cast<int>(size * len), false));
}
}
};
TEST_F(DBWALTest, TrackAndVerifyWALsRecycleWAL) {
Options options = CurrentOptions();
options.avoid_flush_during_shutdown = true;
options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
options.recycle_log_file_num = 1;
options.track_and_verify_wals = true;
DestroyAndReopen(options);
ASSERT_OK(Put("key_ignore", "wal_to_recycle"));
ASSERT_OK(Put("key_ignore1", "wal_to_recycle"));
ASSERT_OK(Put("key_ignore2", "wal_to_recycle"));
ASSERT_OK(Flush());
ASSERT_OK(Put("key_ignore", "wal_to_recycle"));
ASSERT_OK(Put("key_ignore1", "wal_to_recycle"));
ASSERT_OK(Put("key_ignore2", "wal_to_recycle"));
ASSERT_OK(Flush());
options.env->SetBackgroundThreads(1, Env::HIGH);
test::SleepingBackgroundTask sleeping_task;
options.env->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
&sleeping_task, Env::Priority::HIGH);
ASSERT_OK(Put("key1", "old_value"));
ASSERT_OK(dbfull()->TEST_SwitchWAL());
ASSERT_OK(Put("key1", "new_value"));
VectorWalPtr log_files;
ASSERT_OK(db_->GetSortedWalFiles(log_files));
ASSERT_EQ(log_files.size(), 2);
std::string log_name = LogFileName(dbname_, log_files.front()->LogNumber());
Close();
ASSERT_OK(test::TruncateFile(options.env, log_name, 0 ));
{
Status s = DB::Open(options, dbname_, &db_);
ASSERT_OK(s);
}
ASSERT_EQ("wal_to_recycle", Get("key_ignore2"));
ASSERT_EQ("NOT_FOUND", Get("key1"));
Close();
}
class DBWALTrackAndVerifyWALsWithParamsTest
: public DBWALTestBase,
public ::testing::WithParamInterface<WALRecoveryMode> {
public:
DBWALTrackAndVerifyWALsWithParamsTest()
: DBWALTestBase("/db_wal_track_and_verify_wals_with_params_test") {}
};
INSTANTIATE_TEST_CASE_P(
DBWALTrackAndVerifyWALsWithParamsTest,
DBWALTrackAndVerifyWALsWithParamsTest,
::testing::Values(WALRecoveryMode::kTolerateCorruptedTailRecords,
WALRecoveryMode::kAbsoluteConsistency,
WALRecoveryMode::kPointInTimeRecovery,
WALRecoveryMode::kSkipAnyCorruptedRecords));
TEST_P(DBWALTrackAndVerifyWALsWithParamsTest, Basic) {
Options options = CurrentOptions();
options.avoid_flush_during_shutdown = true;
options.track_and_verify_wals = true;
options.wal_recovery_mode = GetParam();
options.env->SetBackgroundThreads(1, Env::HIGH);
test::SleepingBackgroundTask sleeping_task;
options.env->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
&sleeping_task, Env::Priority::HIGH);
for (int i = 0; i < 5; i++) {
DestroyAndReopen(options);
ASSERT_OK(Put("key1", "old_value"));
SequenceNumber last_seqno_recorded_in_fist_wal =
dbfull()->GetLatestSequenceNumber();
ASSERT_OK(dbfull()->TEST_SwitchWAL());
ASSERT_OK(Put("key1", "new_value"));
VectorWalPtr log_files;
ASSERT_OK(db_->GetSortedWalFiles(log_files));
ASSERT_EQ(log_files.size(), 2);
uint64_t first_log_number = log_files.front()->LogNumber();
std::string first_log_name = LogFileName(dbname_, first_log_number);
std::string second_log_name =
LogFileName(dbname_, log_files.back()->LogNumber());
if (i == 0) {
sleeping_task.WakeUp();
sleeping_task.WaitUntilDone();
ASSERT_OK(dbfull()->TEST_WaitForBackgroundWork());
sleeping_task.Reset();
options.env->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
&sleeping_task, Env::Priority::HIGH);
Close();
} else if (i == 1) {
Close();
ASSERT_OK(options.env->DeleteFile(first_log_name));
} else if (i == 2) {
Close();
ASSERT_OK(
test::TruncateFile(options.env, first_log_name, 0 ));
} else if (i == 3) {
Close();
ASSERT_OK(
test::TruncateFile(options.env, first_log_name, 0 ));
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::UpdatePredecessorWALInfo", [&](void* arg) {
std::pair<uint64_t, SequenceNumber*>* pair =
static_cast<std::pair<uint64_t, SequenceNumber*>*>(arg);
if (pair->first == first_log_number) {
*(pair->second) = last_seqno_recorded_in_fist_wal;
}
});
SyncPoint::GetInstance()->EnableProcessing();
} else if (i == 4) {
Close();
ASSERT_OK(options.env->DeleteFile(first_log_name));
ASSERT_OK(options.env->DeleteFile(second_log_name));
}
Status s;
{
s = DB::Open(options, dbname_, &db_);
}
if (i == 0) {
ASSERT_OK(s);
ASSERT_EQ("new_value", Get("key1"));
continue;
} else if (i == 3) {
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->DisableProcessing();
} else if (i == 4) {
ASSERT_TRUE(s.IsCorruption());
ASSERT_TRUE(
s.ToString().find("Opening an existing DB with no WAL files") !=
std::string::npos);
Close();
continue;
}
if (options.wal_recovery_mode == WALRecoveryMode::kPointInTimeRecovery) {
ASSERT_OK(s);
ASSERT_EQ("NOT_FOUND", Get("key1"));
} else if (options.wal_recovery_mode ==
WALRecoveryMode::kAbsoluteConsistency ||
options.wal_recovery_mode ==
WALRecoveryMode::kTolerateCorruptedTailRecords) {
ASSERT_TRUE(s.IsCorruption());
std::string msg;
if (i == 1) {
msg = "Missing WAL";
} else if (i == 2) {
msg = "Mismatched last sequence number recorded in the WAL";
} else if (i == 3) {
msg = "Mismatched size of the WAL";
}
ASSERT_TRUE(s.ToString().find(msg) != std::string::npos);
} else {
ASSERT_OK(s);
ASSERT_EQ("new_value", Get("key1"));
}
Close();
}
}
class DBWALTestWithParams
: public DBWALTestBase,
public ::testing::WithParamInterface<
std::tuple<bool, int, int, CompressionType, bool>> {
public:
DBWALTestWithParams() : DBWALTestBase("/db_wal_test_with_params") {}
};
INSTANTIATE_TEST_CASE_P(
Wal, DBWALTestWithParams,
::testing::Combine(::testing::Bool(), ::testing::Range(0, 4, 1),
::testing::Range(RecoveryTestHelper::kWALFileOffset,
RecoveryTestHelper::kWALFileOffset +
RecoveryTestHelper::kWALFilesCount,
1),
::testing::Values(CompressionType::kNoCompression,
CompressionType::kZSTD),
::testing::Bool()));
class DBWALTestWithParamsVaryingRecoveryMode
: public DBWALTestBase,
public ::testing::WithParamInterface<
std::tuple<bool, int, int, WALRecoveryMode, CompressionType>> {
public:
DBWALTestWithParamsVaryingRecoveryMode()
: DBWALTestBase("/db_wal_test_with_params_mode") {}
};
INSTANTIATE_TEST_CASE_P(
Wal, DBWALTestWithParamsVaryingRecoveryMode,
::testing::Combine(
::testing::Bool(), ::testing::Range(0, 4, 1),
::testing::Range(RecoveryTestHelper::kWALFileOffset,
RecoveryTestHelper::kWALFileOffset +
RecoveryTestHelper::kWALFilesCount,
1),
::testing::Values(WALRecoveryMode::kTolerateCorruptedTailRecords,
WALRecoveryMode::kAbsoluteConsistency,
WALRecoveryMode::kPointInTimeRecovery,
WALRecoveryMode::kSkipAnyCorruptedRecords),
::testing::Values(CompressionType::kNoCompression,
CompressionType::kZSTD)));
TEST_P(DBWALTestWithParams, kTolerateCorruptedTailRecords) {
bool trunc = std::get<0>(GetParam()); int corrupt_offset = std::get<1>(GetParam());
int wal_file_id = std::get<2>(GetParam());
Options options = CurrentOptions();
options.track_and_verify_wals = std::get<4>(GetParam());
const size_t row_count = RecoveryTestHelper::FillData(this, &options);
RecoveryTestHelper::CorruptWAL(this, options, corrupt_offset * .3,
.1, wal_file_id, trunc);
options.wal_recovery_mode = WALRecoveryMode::kTolerateCorruptedTailRecords;
if (trunc) {
options.create_if_missing = false;
ASSERT_OK(TryReopen(options));
const size_t recovered_row_count = RecoveryTestHelper::GetData(this);
ASSERT_TRUE(corrupt_offset == 0 || recovered_row_count > 0);
ASSERT_LT(recovered_row_count, row_count);
} else {
ASSERT_NOK(TryReopen(options));
}
}
TEST_P(DBWALTestWithParams, kAbsoluteConsistency) {
Options options = CurrentOptions();
options.track_and_verify_wals = std::get<4>(GetParam());
const size_t row_count = RecoveryTestHelper::FillData(this, &options);
options.create_if_missing = false;
ASSERT_OK(TryReopen(options));
ASSERT_EQ(RecoveryTestHelper::GetData(this), row_count);
bool trunc = std::get<0>(GetParam()); int corrupt_offset = std::get<1>(GetParam());
int wal_file_id = std::get<2>(GetParam()); CompressionType compression_type = std::get<3>(GetParam());
options.wal_compression = compression_type;
if (trunc && corrupt_offset == 0) {
return;
}
RecoveryTestHelper::FillData(this, &options);
RecoveryTestHelper::CorruptWAL(this, options, corrupt_offset * .33,
.1, wal_file_id, trunc);
options.wal_recovery_mode = WALRecoveryMode::kAbsoluteConsistency;
options.create_if_missing = false;
ASSERT_NOK(TryReopen(options));
}
TEST_F(DBWALTest, kPointInTimeRecoveryCFConsistency) {
Options options = CurrentOptions();
options.avoid_flush_during_recovery = true;
CreateAndReopenWithCF({"one", "two"}, options);
ASSERT_OK(Put(1, "key1", "val1"));
ASSERT_OK(Put(2, "key2", "val2"));
Env* env = options.env;
uint64_t wal_file_id = dbfull()->TEST_LogfileNumber();
std::string fname = LogFileName(dbname_, wal_file_id);
uint64_t offset_to_corrupt;
ASSERT_OK(env->GetFileSize(fname, &offset_to_corrupt));
ASSERT_GT(offset_to_corrupt, 0);
ASSERT_OK(Put(1, "key3", "val3"));
ASSERT_OK(test::CorruptFile(env, fname, static_cast<int>(offset_to_corrupt),
4, false));
ASSERT_OK(Put(2, "key4", "val4"));
ASSERT_OK(Put(1, "key5", "val5"));
ASSERT_OK(Flush(2));
options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
ASSERT_NOK(TryReopenWithColumnFamilies({"default", "one", "two"}, options));
}
TEST_F(DBWALTest, RaceInstallFlushResultsWithWalObsoletion) {
Options options = CurrentOptions();
options.env = env_;
options.track_and_verify_wals_in_manifest = true;
options.max_background_jobs = 8;
DestroyAndReopen(options);
const std::string cf1_name("cf1");
CreateAndReopenWithCF({cf1_name}, options);
assert(handles_.size() == 2);
{
dbfull()->TEST_LockMutex();
ASSERT_LE(2, dbfull()->GetBGJobLimits().max_flushes);
dbfull()->TEST_UnlockMutex();
}
ASSERT_OK(dbfull()->PauseBackgroundWork());
ASSERT_OK(db_->Put(WriteOptions(), handles_[1], "foo", "value"));
ASSERT_OK(db_->Put(WriteOptions(), "foo", "value"));
ASSERT_OK(dbfull()->TEST_FlushMemTable(
false, true, handles_[1]));
ASSERT_OK(db_->Put(WriteOptions(), "foo", "value"));
ASSERT_OK(dbfull()->TEST_FlushMemTable(
false, true, handles_[0]));
bool called = false;
std::atomic<int> bg_flush_threads{0};
std::atomic<bool> wal_synced{false};
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundCallFlush:start", [&](void* ) {
int cur = bg_flush_threads.load();
int desired = cur + 1;
if (cur > 0 ||
!bg_flush_threads.compare_exchange_strong(cur, desired)) {
while (!wal_synced.load()) {
}
}
});
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::FlushMemTableToOutputFile:CommitWal:1",
[&](void* ) { wal_synced.store(true); });
SyncPoint::GetInstance()->SetCallBack(
"MemTableList::TryInstallMemtableFlushResults:AfterComputeMinWalToKeep",
[&](void* ) {
dbfull()->mutex()->AssertHeld();
if (!called) {
called = true;
SyncPoint::GetInstance()->LoadDependency({
{"VersionSet::LogAndApply:WriteManifestStart",
"DBWALTest::RaceInstallFlushResultsWithWalObsoletion:BgFlush2"},
{"DBWALTest::RaceInstallFlushResultsWithWalObsoletion:BgFlush2",
"VersionSet::LogAndApply:WriteManifest"},
});
} else {
TEST_SYNC_POINT(
"DBWALTest::RaceInstallFlushResultsWithWalObsoletion:BgFlush2");
}
});
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(dbfull()->ContinueBackgroundWork());
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[0]));
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[1]));
ASSERT_TRUE(called);
Close();
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
std::unique_ptr<DB> db1;
Status s = DB::OpenForReadOnly(options, dbname_, &db1);
ASSERT_OK(s);
assert(db1);
}
TEST_F(DBWALTest, FixSyncWalOnObseletedWalWithNewManifestCausingMissingWAL) {
Options options = CurrentOptions();
options.max_manifest_file_size = 1;
options.max_manifest_space_amp_pct = 0;
options.track_and_verify_wals_in_manifest = true;
DestroyAndReopen(options);
ASSERT_OK(Put(Key(1), ""));
ASSERT_OK(Put(Key(2), ""));
ASSERT_OK(Put(Key(3), ""));
const std::string wal_file_path = db_->GetName() + "/000004.log";
bool wal_synced = false;
SyncPoint::GetInstance()->SetCallBack(
"FindObsoleteFiles::PostMutexUnlock", [&](void*) {
ASSERT_OK(env_->FileExists(wal_file_path));
uint64_t pre_sync_wal_manifest_no =
dbfull()->TEST_Current_Manifest_FileNo();
ASSERT_OK(db_->SyncWAL());
uint64_t post_sync_wal_manifest_no =
dbfull()->TEST_Current_Manifest_FileNo();
bool new_manifest_created =
post_sync_wal_manifest_no == pre_sync_wal_manifest_no + 1;
ASSERT_TRUE(new_manifest_created);
wal_synced = true;
});
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(Flush());
ASSERT_OK(dbfull()->TEST_WaitForBackgroundWork());
ASSERT_TRUE(wal_synced);
ASSERT_TRUE(env_->FileExists(wal_file_path).IsNotFound());
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->DisableProcessing();
Status s = TryReopen(options);
EXPECT_OK(s);
}
TEST_P(DBWALTestWithParams, kPointInTimeRecovery) {
const int maxkeys =
RecoveryTestHelper::kWALFilesCount * RecoveryTestHelper::kKeysPerWALFile;
bool trunc = std::get<0>(GetParam()); int corrupt_offset = std::get<1>(GetParam());
int wal_file_id = std::get<2>(GetParam()); CompressionType compression_type = std::get<3>(GetParam());
Options options = CurrentOptions();
options.track_and_verify_wals = std::get<4>(GetParam());
options.wal_compression = compression_type;
const size_t row_count = RecoveryTestHelper::FillData(this, &options);
RecoveryTestHelper::CorruptWAL(this, options, corrupt_offset * .33,
.1, wal_file_id, trunc);
options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
options.create_if_missing = false;
ASSERT_OK(TryReopen(options));
size_t recovered_row_count = RecoveryTestHelper::GetData(this);
ASSERT_LT(recovered_row_count, row_count);
if (!trunc || corrupt_offset != 0) {
bool expect_data = true;
for (size_t k = 0; k < maxkeys; ++k) {
bool found = Get("key" + std::to_string(k)) != "NOT_FOUND";
if (expect_data && !found) {
expect_data = false;
}
ASSERT_EQ(found, expect_data);
}
}
const size_t min = RecoveryTestHelper::kKeysPerWALFile *
(wal_file_id - RecoveryTestHelper::kWALFileOffset);
ASSERT_GE(recovered_row_count, min);
if (!trunc && corrupt_offset != 0) {
const size_t max = RecoveryTestHelper::kKeysPerWALFile *
(wal_file_id - RecoveryTestHelper::kWALFileOffset + 1);
ASSERT_LE(recovered_row_count, max);
}
}
TEST_P(DBWALTestWithParams, kSkipAnyCorruptedRecords) {
bool trunc = std::get<0>(GetParam()); int corrupt_offset = std::get<1>(GetParam());
int wal_file_id = std::get<2>(GetParam()); CompressionType compression_type = std::get<3>(GetParam());
Options options = CurrentOptions();
options.track_and_verify_wals = std::get<4>(GetParam());
options.wal_compression = compression_type;
const size_t row_count = RecoveryTestHelper::FillData(this, &options);
RecoveryTestHelper::CorruptWAL(this, options, corrupt_offset * .3,
.1, wal_file_id, trunc);
options.wal_recovery_mode = WALRecoveryMode::kSkipAnyCorruptedRecords;
options.create_if_missing = false;
ASSERT_OK(TryReopen(options));
size_t recovered_row_count = RecoveryTestHelper::GetData(this);
ASSERT_LT(recovered_row_count, row_count);
if (!trunc) {
ASSERT_TRUE(corrupt_offset != 0 || recovered_row_count > 0);
}
}
TEST_F(DBWALTest, AvoidFlushDuringRecovery) {
Options options = CurrentOptions();
options.disable_auto_compactions = true;
options.avoid_flush_during_recovery = false;
Reopen(options);
ASSERT_OK(Put("foo", "v1"));
ASSERT_OK(Put("bar", "v2"));
ASSERT_OK(Flush());
ASSERT_OK(Put("foo", "v3"));
ASSERT_OK(Put("bar", "v4"));
ASSERT_EQ(1, TotalTableFiles());
Reopen(options);
ASSERT_EQ("v3", Get("foo"));
ASSERT_EQ("v4", Get("bar"));
ASSERT_EQ(2, TotalTableFiles());
options.avoid_flush_during_recovery = true;
DestroyAndReopen(options);
ASSERT_OK(Put("foo", "v5"));
ASSERT_OK(Put("bar", "v6"));
ASSERT_OK(Flush());
ASSERT_OK(Put("foo", "v7"));
ASSERT_OK(Put("bar", "v8"));
ASSERT_EQ(1, TotalTableFiles());
Reopen(options);
ASSERT_EQ("v7", Get("foo"));
ASSERT_EQ("v8", Get("bar"));
ASSERT_EQ(1, TotalTableFiles());
options.avoid_flush_during_recovery = true;
options.allow_2pc = true;
ASSERT_OK(Put("foo", "v9"));
ASSERT_OK(Put("bar", "v10"));
ASSERT_OK(Flush());
ASSERT_OK(Put("foo", "v11"));
ASSERT_OK(Put("bar", "v12"));
Reopen(options);
ASSERT_EQ("v11", Get("foo"));
ASSERT_EQ("v12", Get("bar"));
ASSERT_EQ(3, TotalTableFiles());
}
TEST_F(DBWALTest, WalCleanupAfterAvoidFlushDuringRecovery) {
Options options = CurrentOptions();
options.disable_auto_compactions = true;
options.avoid_flush_during_recovery = true;
Reopen(options);
ASSERT_OK(Put("foo", "v1"));
Reopen(options);
for (int i = 0; i < 2; ++i) {
if (i > 0) {
ASSERT_OK(Flush());
}
VectorLogPtr log_files;
ASSERT_OK(dbfull()->GetSortedWalFiles(log_files));
if (i == 0) {
ASSERT_GT(log_files.size(), 0);
} else {
ASSERT_EQ(0, log_files.size());
}
}
}
TEST_F(DBWALTest, RecoverWithoutFlush) {
Options options = CurrentOptions();
options.avoid_flush_during_recovery = true;
options.create_if_missing = false;
options.disable_auto_compactions = true;
options.write_buffer_size = 64 * 1024 * 1024;
size_t count = RecoveryTestHelper::FillData(this, &options);
auto validateData = [this, count]() {
for (size_t i = 0; i < count; i++) {
ASSERT_NE(Get("key" + std::to_string(i)), "NOT_FOUND");
}
};
Reopen(options);
validateData();
ASSERT_OK(Put("foo", "foo_v1"));
ASSERT_OK(Put("bar", "bar_v1"));
Reopen(options);
validateData();
ASSERT_EQ(Get("foo"), "foo_v1");
ASSERT_EQ(Get("bar"), "bar_v1");
ASSERT_OK(Put("foo", "foo_v2"));
ASSERT_OK(Put("bar", "bar_v2"));
Reopen(options);
validateData();
ASSERT_EQ(Get("foo"), "foo_v2");
ASSERT_EQ(Get("bar"), "bar_v2");
ASSERT_OK(Flush());
ASSERT_EQ(Get("foo"), "foo_v2");
ASSERT_EQ(Get("bar"), "bar_v2");
ASSERT_OK(Put("foo", "foo_v3"));
ASSERT_OK(Put("bar", "bar_v3"));
Reopen(options);
validateData();
ASSERT_EQ(Get("foo"), "foo_v3");
ASSERT_EQ(Get("bar"), "bar_v3");
}
TEST_F(DBWALTest, RecoverWithoutFlushMultipleCF) {
const std::string kSmallValue = "v";
const std::string kLargeValue = DummyString(1024);
Options options = CurrentOptions();
options.avoid_flush_during_recovery = true;
options.create_if_missing = false;
options.disable_auto_compactions = true;
auto countWalFiles = [this]() {
VectorLogPtr log_files;
if (!dbfull()->GetSortedWalFiles(log_files).ok()) {
return size_t{0};
}
return log_files.size();
};
CreateAndReopenWithCF({"one", "two"}, options);
ASSERT_OK(Put(0, "key1", kSmallValue));
ASSERT_OK(Put(1, "key2", kLargeValue));
ASSERT_OK(Flush(1));
ASSERT_EQ(1, countWalFiles());
ASSERT_OK(Put(0, "key3", kSmallValue));
ASSERT_OK(Put(2, "key4", kLargeValue));
ASSERT_OK(Flush(2));
ASSERT_EQ(2, countWalFiles());
options.db_write_buffer_size = 64 * 1024 * 1024;
ReopenWithColumnFamilies({"default", "one", "two"}, options);
ASSERT_EQ(Get(0, "key1"), kSmallValue);
ASSERT_EQ(Get(1, "key2"), kLargeValue);
ASSERT_EQ(Get(0, "key3"), kSmallValue);
ASSERT_EQ(Get(2, "key4"), kLargeValue);
ASSERT_OK(Put(0, "key5", kLargeValue));
ASSERT_OK(Put(1, "key6", kLargeValue));
ASSERT_EQ(3, countWalFiles());
ASSERT_OK(Flush(1));
ASSERT_OK(Put(2, "key7", kLargeValue));
ASSERT_OK(dbfull()->FlushWAL(false));
ASSERT_EQ(4, countWalFiles());
for (int i = 0; i < 2; i++) {
ReopenWithColumnFamilies({"default", "one", "two"}, options);
ASSERT_EQ(Get(0, "key1"), kSmallValue);
ASSERT_EQ(Get(1, "key2"), kLargeValue);
ASSERT_EQ(Get(0, "key3"), kSmallValue);
ASSERT_EQ(Get(2, "key4"), kLargeValue);
ASSERT_EQ(Get(0, "key5"), kLargeValue);
ASSERT_EQ(Get(1, "key6"), kLargeValue);
ASSERT_EQ(Get(2, "key7"), kLargeValue);
ASSERT_EQ(4, countWalFiles());
}
}
TEST_P(DBWALTestWithParamsVaryingRecoveryMode,
RecoverFromCorruptedWALWithoutFlush) {
const int kAppendKeys = 100;
Options options = CurrentOptions();
options.avoid_flush_during_recovery = true;
options.create_if_missing = false;
options.disable_auto_compactions = true;
options.write_buffer_size = 64 * 1024 * 1024;
auto getAll = [this]() {
std::vector<std::pair<std::string, std::string>> data;
ReadOptions ropt;
Iterator* iter = dbfull()->NewIterator(ropt);
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
data.emplace_back(iter->key().ToString(), iter->value().ToString());
}
EXPECT_OK(iter->status());
delete iter;
return data;
};
bool trunc = std::get<0>(GetParam()); int corrupt_offset = std::get<1>(GetParam());
int wal_file_id = std::get<2>(GetParam()); WALRecoveryMode recovery_mode = std::get<3>(GetParam());
CompressionType compression_type = std::get<4>(GetParam());
options.wal_recovery_mode = recovery_mode;
options.wal_compression = compression_type;
RecoveryTestHelper::FillData(this, &options);
RecoveryTestHelper::CorruptWAL(this, options, corrupt_offset * .3,
.1, wal_file_id, trunc);
if (!TryReopen(options).ok()) {
ASSERT_TRUE(options.wal_recovery_mode ==
WALRecoveryMode::kAbsoluteConsistency ||
(!trunc && options.wal_recovery_mode ==
WALRecoveryMode::kTolerateCorruptedTailRecords));
return;
}
ASSERT_OK(TryReopen(options));
for (int k = 0; k < kAppendKeys; k++) {
std::string key = "extra_key" + std::to_string(k);
std::string value = DummyString(RecoveryTestHelper::kValueSize);
ASSERT_OK(Put(key, value));
}
auto data = getAll();
ASSERT_OK(TryReopen(options));
auto actual_data = getAll();
ASSERT_EQ(data, actual_data);
}
TEST_F(DBWALTest, RestoreTotalLogSizeAfterRecoverWithoutFlush) {
auto test_listener = std::make_shared<FlushCounterListener>();
test_listener->expected_flush_reason = FlushReason::kWalFull;
constexpr size_t kKB = 1024;
constexpr size_t kMB = 1024 * 1024;
Options options = CurrentOptions();
options.avoid_flush_during_recovery = true;
options.max_total_wal_size = 1 * kMB;
options.listeners.push_back(test_listener);
CreateAndReopenWithCF({"one"}, options);
std::string value_100k(100 * kKB, 'v');
std::string value_300k(300 * kKB, 'v');
ASSERT_OK(Put(0, "foo", "v1"));
for (int i = 0; i < 9; i++) {
ASSERT_OK(Put(1, "key" + std::to_string(i), value_100k));
}
VectorLogPtr log_files_before;
ASSERT_OK(dbfull()->GetSortedWalFiles(log_files_before));
ASSERT_EQ(1, log_files_before.size());
uint64_t log_size_before = log_files_before[0]->SizeFileBytes();
ASSERT_GT(log_size_before, 900 * kKB);
ASSERT_LT(log_size_before, 1 * kMB);
ReopenWithColumnFamilies({"default", "one"}, options);
ASSERT_OK(Put(1, "bar", value_300k));
VectorLogPtr log_files_after_reopen;
ASSERT_OK(dbfull()->GetSortedWalFiles(log_files_after_reopen));
ASSERT_EQ(2, log_files_after_reopen.size());
ASSERT_EQ(log_files_before[0]->LogNumber(),
log_files_after_reopen[0]->LogNumber());
ASSERT_GT(log_files_after_reopen[0]->SizeFileBytes() +
log_files_after_reopen[1]->SizeFileBytes(),
1 * kMB);
ASSERT_OK(Put(0, "foo", "v2"));
for (auto* h : handles_) {
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(h));
}
ASSERT_EQ(2, test_listener->count.load());
}
#if defined(ROCKSDB_PLATFORM_POSIX)
#if defined(ROCKSDB_FALLOCATE_PRESENT)
TEST_F(DBWALTest, TruncateLastLogAfterRecoverWithoutFlush) {
constexpr size_t kKB = 1024;
Options options = CurrentOptions();
options.env = env_;
options.avoid_flush_during_recovery = true;
if (mem_env_) {
ROCKSDB_GTEST_SKIP("Test requires non-mem environment");
return;
}
if (!IsFallocateSupported()) {
return;
}
DestroyAndReopen(options);
size_t preallocated_size =
dbfull()->TEST_GetWalPreallocateBlockSize(options.write_buffer_size);
ASSERT_OK(Put("foo", "v1"));
VectorLogPtr log_files_before;
ASSERT_OK(dbfull()->GetSortedWalFiles(log_files_before));
ASSERT_EQ(1, log_files_before.size());
auto& file_before = log_files_before[0];
ASSERT_LT(file_before->SizeFileBytes(), 1 * kKB);
ASSERT_GE(GetAllocatedFileSize(dbname_ + file_before->PathName()),
preallocated_size);
Reopen(options);
VectorLogPtr log_files_after;
ASSERT_OK(dbfull()->GetSortedWalFiles(log_files_after));
ASSERT_EQ(1, log_files_after.size());
ASSERT_LT(log_files_after[0]->SizeFileBytes(), 1 * kKB);
ASSERT_LT(GetAllocatedFileSize(dbname_ + file_before->PathName()),
preallocated_size);
}
TEST_F(DBWALTest, TruncateLastLogAfterRecoverWithFlush) {
constexpr size_t kKB = 1024;
Options options = CurrentOptions();
options.env = env_;
options.avoid_flush_during_recovery = false;
options.avoid_flush_during_shutdown = true;
if (mem_env_) {
ROCKSDB_GTEST_SKIP("Test requires non-mem environment");
return;
}
if (!IsFallocateSupported()) {
return;
}
DestroyAndReopen(options);
size_t preallocated_size =
dbfull()->TEST_GetWalPreallocateBlockSize(options.write_buffer_size);
ASSERT_OK(Put("foo", "v1"));
VectorLogPtr log_files_before;
ASSERT_OK(dbfull()->GetSortedWalFiles(log_files_before));
ASSERT_EQ(1, log_files_before.size());
auto& file_before = log_files_before[0];
ASSERT_LT(file_before->SizeFileBytes(), 1 * kKB);
ASSERT_GE(GetAllocatedFileSize(dbname_ + file_before->PathName()),
preallocated_size);
Close();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::PurgeObsoleteFiles:Begin",
"DBWALTest::TruncateLastLogAfterRecoverWithFlush:AfterRecover"},
{"DBWALTest::TruncateLastLogAfterRecoverWithFlush:AfterTruncate",
"DBImpl::DeleteObsoleteFileImpl::BeforeDeletion"}});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
port::Thread reopen_thread([&]() { Reopen(options); });
TEST_SYNC_POINT(
"DBWALTest::TruncateLastLogAfterRecoverWithFlush:AfterRecover");
EXPECT_LT(GetAllocatedFileSize(dbname_ + file_before->PathName()),
preallocated_size);
TEST_SYNC_POINT(
"DBWALTest::TruncateLastLogAfterRecoverWithFlush:AfterTruncate");
reopen_thread.join();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(DBWALTest, TruncateLastLogAfterRecoverWALEmpty) {
Options options = CurrentOptions();
options.env = env_;
options.avoid_flush_during_recovery = false;
if (mem_env_ || encrypted_env_) {
ROCKSDB_GTEST_SKIP("Test requires non-mem/non-encrypted environment");
return;
}
if (!IsFallocateSupported()) {
return;
}
DestroyAndReopen(options);
size_t preallocated_size =
dbfull()->TEST_GetWalPreallocateBlockSize(options.write_buffer_size);
Close();
std::vector<std::string> filenames;
std::string last_log;
uint64_t last_log_num = 0;
ASSERT_OK(env_->GetChildren(dbname_, &filenames));
for (const auto& fname : filenames) {
uint64_t number;
FileType type;
if (ParseFileName(fname, &number, &type, nullptr)) {
if (type == kWalFile && number > last_log_num) {
last_log = fname;
}
}
}
ASSERT_NE(last_log, "");
last_log = dbname_ + '/' + last_log;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::PurgeObsoleteFiles:Begin",
"DBWALTest::TruncateLastLogAfterRecoverWithFlush:AfterRecover"},
{"DBWALTest::TruncateLastLogAfterRecoverWithFlush:AfterTruncate",
"DBImpl::DeleteObsoleteFileImpl::BeforeDeletion"}});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"PosixWritableFile::Close",
[](void* arg) { *(static_cast<size_t*>(arg)) = 0; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
std::unique_ptr<WritableFile> log_file;
ASSERT_OK(env_->ReopenWritableFile(last_log, &log_file, EnvOptions()));
log_file->SetPreallocationBlockSize(preallocated_size);
log_file->PrepareWrite(0, 4096);
log_file.reset();
ASSERT_GE(GetAllocatedFileSize(last_log), preallocated_size);
port::Thread reopen_thread([&]() { Reopen(options); });
TEST_SYNC_POINT(
"DBWALTest::TruncateLastLogAfterRecoverWithFlush:AfterRecover");
EXPECT_LT(GetAllocatedFileSize(last_log), preallocated_size);
TEST_SYNC_POINT(
"DBWALTest::TruncateLastLogAfterRecoverWithFlush:AfterTruncate");
reopen_thread.join();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
}
TEST_F(DBWALTest, ReadOnlyRecoveryNoTruncate) {
constexpr size_t kKB = 1024;
Options options = CurrentOptions();
options.env = env_;
options.avoid_flush_during_recovery = true;
if (mem_env_) {
ROCKSDB_GTEST_SKIP("Test requires non-mem environment");
return;
}
if (!IsFallocateSupported()) {
return;
}
std::atomic_bool enable_truncate{false};
SyncPoint::GetInstance()->SetCallBack("PosixWritableFile::Close",
[&](void* arg) {
if (!enable_truncate) {
*(static_cast<size_t*>(arg)) = 0;
}
});
SyncPoint::GetInstance()->EnableProcessing();
DestroyAndReopen(options);
size_t preallocated_size =
dbfull()->TEST_GetWalPreallocateBlockSize(options.write_buffer_size);
ASSERT_OK(Put("foo", "v1"));
VectorLogPtr log_files_before;
ASSERT_OK(dbfull()->GetSortedWalFiles(log_files_before));
ASSERT_EQ(1, log_files_before.size());
auto& file_before = log_files_before[0];
ASSERT_LT(file_before->SizeFileBytes(), 1 * kKB);
auto db_size = GetAllocatedFileSize(dbname_ + file_before->PathName());
ASSERT_GE(db_size, preallocated_size);
Close();
enable_truncate = true;
ASSERT_OK(ReadOnlyReopen(options));
VectorLogPtr log_files_after;
ASSERT_OK(dbfull()->GetSortedWalFiles(log_files_after));
ASSERT_EQ(1, log_files_after.size());
ASSERT_LT(log_files_after[0]->SizeFileBytes(), 1 * kKB);
ASSERT_EQ(log_files_after[0]->PathName(), file_before->PathName());
ASSERT_NEAR(GetAllocatedFileSize(dbname_ + file_before->PathName()), db_size,
db_size / 100);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}
#endif #endif
TEST_F(DBWALTest, WalInManifestButNotInSortedWals) {
Options options = CurrentOptions();
options.track_and_verify_wals_in_manifest = true;
options.wal_recovery_mode = WALRecoveryMode::kAbsoluteConsistency;
bool wals_go_missing = false;
struct MissingWalFs : public FileSystemWrapper {
MissingWalFs(const std::shared_ptr<FileSystem>& t,
bool* _wals_go_missing_flag)
: FileSystemWrapper(t), wals_go_missing_flag(_wals_go_missing_flag) {}
bool* wals_go_missing_flag;
IOStatus GetChildren(const std::string& dir, const IOOptions& io_opts,
std::vector<std::string>* r,
IODebugContext* dbg) override {
IOStatus s = target_->GetChildren(dir, io_opts, r, dbg);
if (s.ok() && *wals_go_missing_flag) {
for (size_t i = 0; i < r->size();) {
if (EndsWith(r->at(i), ".log")) {
r->erase(r->begin() + i);
} else {
++i;
}
}
}
return s;
}
const char* Name() const override { return "MissingWalFs"; }
};
auto my_fs =
std::make_shared<MissingWalFs>(env_->GetFileSystem(), &wals_go_missing);
std::unique_ptr<Env> my_env(NewCompositeEnv(my_fs));
options.env = my_env.get();
CreateAndReopenWithCF({"blah"}, options);
ASSERT_OK(Put(0, "x", "y"));
ASSERT_OK(db_->SyncWAL());
ASSERT_OK(Put(1, "x", "y"));
ASSERT_OK(db_->SyncWAL());
ASSERT_OK(Flush(1));
ASSERT_FALSE(dbfull()->GetVersionSet()->GetWalSet().GetWals().empty());
std::vector<std::unique_ptr<LogFile>> wals;
ASSERT_OK(db_->GetSortedWalFiles(wals));
wals_go_missing = true;
ASSERT_NOK(db_->GetSortedWalFiles(wals));
wals_go_missing = false;
Close();
}
TEST_F(DBWALTest, WalTermTest) {
Options options = CurrentOptions();
options.env = env_;
CreateAndReopenWithCF({"pikachu"}, options);
ASSERT_OK(Put(1, "foo", "bar"));
WriteOptions wo;
wo.sync = true;
wo.disableWAL = false;
WriteBatch batch;
ASSERT_OK(batch.Put("foo", "bar"));
batch.MarkWalTerminationPoint();
ASSERT_OK(batch.Put("foo2", "bar2"));
ASSERT_OK(dbfull()->Write(wo, &batch));
ASSERT_OK(TryReopenWithColumnFamilies({"default", "pikachu"}, options));
ASSERT_EQ("bar", Get(1, "foo"));
ASSERT_EQ("NOT_FOUND", Get(1, "foo2"));
}
TEST_F(DBWALTest, GetCompressedWalsAfterSync) {
if (db_->GetOptions().wal_compression == kNoCompression) {
ROCKSDB_GTEST_BYPASS("stream compression not present");
return;
}
Options options = GetDefaultOptions();
options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
options.create_if_missing = true;
options.env = env_;
options.avoid_flush_during_recovery = true;
options.track_and_verify_wals_in_manifest = true;
options.wal_compression = kZSTD;
DestroyAndReopen(options);
ASSERT_OK(Put("a", "v"));
Reopen(options);
ASSERT_OK(dbfull()->TEST_SwitchWAL());
ASSERT_OK(Put("b", "v"));
ASSERT_OK(db_->SyncWAL());
VectorLogPtr wals;
Status s = dbfull()->GetSortedWalFiles(wals);
ASSERT_OK(s);
}
TEST_F(DBWALTest, EmptyWalReopenTest) {
Options options = CurrentOptions();
options.env = env_;
CreateAndReopenWithCF({"pikachu"}, options);
ASSERT_OK(TryReopenWithColumnFamilies({"default", "pikachu"}, options));
{
std::vector<std::string> files;
int num_wal_files = 0;
ASSERT_OK(env_->GetChildren(dbname_, &files));
for (const auto& file : files) {
uint64_t number = 0;
FileType type = kWalFile;
if (ParseFileName(file, &number, &type) && type == kWalFile) {
num_wal_files++;
}
}
ASSERT_EQ(num_wal_files, 1);
}
}
TEST_F(DBWALTest, RecoveryFlushSwitchWALOnEmptyMemtable) {
Options options = CurrentOptions();
auto fault_fs = std::make_shared<FaultInjectionTestFS>(FileSystem::Default());
std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
options.env = fault_fs_env.get();
options.avoid_flush_during_shutdown = true;
DestroyAndReopen(options);
SyncPoint::GetInstance()->LoadDependency(
{{"DBWALTest.RecoveryFlushSwitchWALOnEmptyMemtable:"
"AfterCheckMemtableEmpty",
"RecoverFromRetryableBGIOError:BeforeStart"}});
SyncPoint::GetInstance()->EnableProcessing();
fault_fs->SetThreadLocalErrorContext(
FaultInjectionIOType::kMetadataWrite, 7 , 1 ,
true , false );
fault_fs->EnableThreadLocalErrorInjection(
FaultInjectionIOType::kMetadataWrite);
WriteOptions wo;
wo.sync = true;
Status s = Put("k", "old_v", wo);
ASSERT_TRUE(s.IsIOError());
ASSERT_TRUE(static_cast<ColumnFamilyHandleImpl*>(db_->DefaultColumnFamily())
->cfd()
->mem()
->IsEmpty());
ASSERT_EQ("NOT_FOUND", Get("k"));
TEST_SYNC_POINT(
"DBWALTest.RecoveryFlushSwitchWALOnEmptyMemtable:"
"AfterCheckMemtableEmpty");
SyncPoint::GetInstance()->DisableProcessing();
fault_fs->DisableThreadLocalErrorInjection(
FaultInjectionIOType::kMetadataWrite);
while (!s.ok()) {
options.env->SleepForMicroseconds(1000);
s = Put("k", "new_v");
}
options.avoid_flush_during_recovery = false;
Reopen(options);
ASSERT_EQ("new_v", Get("k"));
Destroy(options);
}
TEST_F(DBWALTest, WALWriteErrorNoRecovery) {
Options options = CurrentOptions();
auto fault_fs = std::make_shared<FaultInjectionTestFS>(FileSystem::Default());
std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
options.env = fault_fs_env.get();
options.manual_wal_flush = true;
DestroyAndReopen(options);
fault_fs->SetThreadLocalErrorContext(
FaultInjectionIOType::kWrite, 7 , 1 ,
true , false );
fault_fs->EnableThreadLocalErrorInjection(FaultInjectionIOType::kWrite);
ASSERT_OK(Put("k", "v"));
Status s;
s = db_->FlushWAL(false);
ASSERT_TRUE(s.IsIOError());
s = dbfull()->TEST_GetBGError();
ASSERT_EQ(s.severity(), Status::Severity::kFatalError);
ASSERT_FALSE(dbfull()->TEST_IsRecoveryInProgress());
fault_fs->DisableThreadLocalErrorInjection(FaultInjectionIOType::kWrite);
Destroy(options);
}
}
int main(int argc, char** argv) {
ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}