#include <fcntl.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <cinttypes>
#include "db/db_impl/db_impl.h"
#include "db/db_test_util.h"
#include "db/log_format.h"
#include "db/version_set.h"
#include "file/filename.h"
#include "port/stack_trace.h"
#include "rocksdb/cache.h"
#include "rocksdb/convenience.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/options.h"
#include "rocksdb/table.h"
#include "rocksdb/utilities/transaction_db.h"
#include "rocksdb/write_batch.h"
#include "table/block_based/block_based_table_builder.h"
#include "table/meta_blocks.h"
#include "table/mock_table.h"
#include "test_util/testharness.h"
#include "test_util/testutil.h"
#include "util/cast_util.h"
#include "util/random.h"
#include "util/string_util.h"
namespace ROCKSDB_NAMESPACE {
static constexpr int kValueSize = 1000;
namespace {
class ErrorFS : public FileSystemWrapper {
public:
bool writable_file_error_;
int num_writable_file_errors_;
explicit ErrorFS(const std::shared_ptr<FileSystem>& _target)
: FileSystemWrapper(_target),
writable_file_error_(false),
num_writable_file_errors_(0) {}
const char* Name() const override { return "ErrorEnv"; }
IOStatus NewWritableFile(const std::string& fname, const FileOptions& opts,
std::unique_ptr<FSWritableFile>* result,
IODebugContext* dbg) override {
result->reset();
if (writable_file_error_) {
++num_writable_file_errors_;
return IOStatus::IOError(fname, "fake error");
}
return target()->NewWritableFile(fname, opts, result, dbg);
}
};
} class CorruptionTest : public testing::Test {
public:
std::shared_ptr<Env> env_guard_;
std::shared_ptr<ErrorFS> fs_;
std::unique_ptr<Env> env_;
Env* base_env_;
std::string dbname_;
std::shared_ptr<Cache> tiny_cache_;
Options options_;
std::unique_ptr<DB> db_;
CorruptionTest() {
tiny_cache_ = NewLRUCache(100, 4);
base_env_ = Env::Default();
EXPECT_OK(
test::CreateEnvFromSystem(ConfigOptions(), &base_env_, &env_guard_));
EXPECT_NE(base_env_, nullptr);
fs_.reset(new ErrorFS(base_env_->GetFileSystem()));
env_ = NewCompositeEnv(fs_);
options_.wal_recovery_mode = WALRecoveryMode::kTolerateCorruptedTailRecords;
options_.env = env_.get();
dbname_ = test::PerThreadDBPath(env_.get(), "corruption_test");
Status s = DestroyDB(dbname_, options_);
EXPECT_OK(s);
db_ = nullptr;
options_.create_if_missing = true;
BlockBasedTableOptions table_options;
table_options.block_size_deviation = 0; options_.table_factory.reset(NewBlockBasedTableFactory(table_options));
Reopen();
options_.create_if_missing = false;
}
~CorruptionTest() override {
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->LoadDependency({});
SyncPoint::GetInstance()->ClearAllCallBacks();
db_.reset();
if (getenv("KEEP_DB")) {
fprintf(stdout, "db is still at %s\n", dbname_.c_str());
} else {
Options opts;
opts.env = base_env_;
EXPECT_OK(DestroyDB(dbname_, opts));
}
}
void CloseDb() { db_.reset(); }
DBImpl* dbfull() { return static_cast_with_check<DBImpl>(db_.get()); }
Status TryReopen(Options* options = nullptr) {
db_.reset();
Options opt = (options ? *options : options_);
if (opt.env == Options().env) {
opt.env = env_.get();
}
opt.arena_block_size = 4096;
BlockBasedTableOptions table_options;
table_options.block_cache = tiny_cache_;
table_options.block_size_deviation = 0;
opt.table_factory.reset(NewBlockBasedTableFactory(table_options));
return DB::Open(opt, dbname_, &db_);
}
void Reopen(Options* options = nullptr) { ASSERT_OK(TryReopen(options)); }
void RepairDB() {
db_.reset();
ASSERT_OK(::ROCKSDB_NAMESPACE::RepairDB(dbname_, options_));
}
void Build(int n, int start, int flush_every) {
std::string key_space, value_space;
WriteBatch batch;
for (int i = 0; i < n; i++) {
if (flush_every != 0 && i != 0 && i % flush_every == 0) {
ASSERT_OK(dbfull()->TEST_FlushMemTable());
}
Slice key = Key(i + start, &key_space);
batch.Clear();
ASSERT_OK(batch.Put(key, Value(i + start, &value_space)));
ASSERT_OK(db_->Write(WriteOptions(), &batch));
}
}
void Build(int n, int flush_every = 0) { Build(n, 0, flush_every); }
void Check(int min_expected, int max_expected) {
Check(min_expected, max_expected, ReadOptions(false, true));
}
void Check(int min_expected, int max_expected, ReadOptions read_options) {
uint64_t next_expected = 0;
uint64_t missed = 0;
int bad_keys = 0;
int bad_values = 0;
int correct = 0;
std::string value_space;
Iterator* iter = db_->NewIterator(read_options);
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ASSERT_OK(iter->status());
uint64_t key;
Slice in(iter->key());
if (!ConsumeDecimalNumber(&in, &key) || !in.empty() ||
key < next_expected) {
bad_keys++;
continue;
}
missed += (key - next_expected);
next_expected = key + 1;
if (iter->value() != Value(static_cast<int>(key), &value_space)) {
bad_values++;
} else {
correct++;
}
}
iter->status().PermitUncheckedError();
delete iter;
fprintf(
stderr,
"expected=%d..%d; got=%d; bad_keys=%d; bad_values=%d; missed=%llu\n",
min_expected, max_expected, correct, bad_keys, bad_values,
static_cast<unsigned long long>(missed));
ASSERT_LE(min_expected, correct);
ASSERT_GE(max_expected, correct);
}
void Corrupt(FileType filetype, int offset, int bytes_to_corrupt) {
std::vector<std::string> filenames;
ASSERT_OK(env_->GetChildren(dbname_, &filenames));
uint64_t number;
FileType type;
std::string fname;
int picked_number = -1;
for (size_t i = 0; i < filenames.size(); i++) {
if (ParseFileName(filenames[i], &number, &type) && type == filetype &&
static_cast<int>(number) > picked_number) { fname = dbname_ + "/" + filenames[i];
picked_number = static_cast<int>(number);
}
}
ASSERT_TRUE(!fname.empty()) << filetype;
ASSERT_OK(test::CorruptFile(env_.get(), fname, offset, bytes_to_corrupt,
filetype == kTableFile));
}
void CorruptTableFileAtLevel(int level, int offset, int bytes_to_corrupt) {
std::vector<LiveFileMetaData> metadata;
db_->GetLiveFilesMetaData(&metadata);
for (const auto& m : metadata) {
if (m.level == level) {
ASSERT_OK(test::CorruptFile(env_.get(), dbname_ + "/" + m.name, offset,
bytes_to_corrupt));
return;
}
}
FAIL() << "no file found at level";
}
int Property(const std::string& name) {
std::string property;
int result;
if (db_->GetProperty(name, &property) &&
sscanf(property.c_str(), "%d", &result) == 1) {
return result;
} else {
return -1;
}
}
Slice Key(int i, std::string* storage) {
char buf[100];
snprintf(buf, sizeof(buf), "%016d", i);
storage->assign(buf, strlen(buf));
return Slice(*storage);
}
Slice Value(int k, std::string* storage) {
if (k == 0) {
*storage = std::string(kValueSize, ' ');
} else {
Random r(k);
*storage = r.RandomString(kValueSize);
}
return Slice(*storage);
}
void GetSortedWalFiles(std::vector<uint64_t>& file_nums) {
std::vector<std::string> tmp_files;
ASSERT_OK(env_->GetChildren(dbname_, &tmp_files));
FileType type = kWalFile;
for (const auto& file : tmp_files) {
uint64_t number = 0;
if (ParseFileName(file, &number, &type) && type == kWalFile) {
file_nums.push_back(number);
}
}
std::sort(file_nums.begin(), file_nums.end());
}
void CorruptFileWithTruncation(FileType file, uint64_t number,
uint64_t bytes_to_truncate = 0) {
std::string path;
switch (file) {
case FileType::kWalFile:
path = LogFileName(dbname_, number);
break;
default:
return;
}
uint64_t old_size = 0;
ASSERT_OK(env_->GetFileSize(path, &old_size));
assert(old_size > bytes_to_truncate);
uint64_t new_size = old_size - bytes_to_truncate;
if (bytes_to_truncate == 0) {
new_size = 0;
}
ASSERT_OK(test::TruncateFile(env_.get(), path, new_size));
}
};
TEST_F(CorruptionTest, Recovery) {
Build(100);
Check(100, 100);
#ifdef OS_WIN
CloseDb();
#endif
Corrupt(kWalFile, 19, 1); Corrupt(kWalFile, log::kBlockSize + 1000, 1); ASSERT_TRUE(!TryReopen().ok());
options_.paranoid_checks = false;
Reopen(&options_);
Check(36, 36);
}
TEST_F(CorruptionTest, PostPITRCorruptionWALsRetained) {
CloseDb();
options_.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
const std::string test_cf_name = "test_cf";
std::vector<ColumnFamilyDescriptor> cf_descs;
cf_descs.emplace_back(kDefaultColumnFamilyName, ColumnFamilyOptions());
cf_descs.emplace_back(test_cf_name, ColumnFamilyOptions());
uint64_t log_num;
{
options_.create_missing_column_families = true;
std::vector<ColumnFamilyHandle*> cfhs;
ASSERT_OK(DB::Open(options_, dbname_, cf_descs, &cfhs, &db_));
assert(db_ != nullptr);
ASSERT_OK(db_->Put(WriteOptions(), cfhs[0], "k", "v"));
ASSERT_OK(db_->Put(WriteOptions(), cfhs[1], "k", "v"));
ASSERT_OK(db_->Put(WriteOptions(), cfhs[0], "k2", "v2"));
std::vector<uint64_t> file_nums;
GetSortedWalFiles(file_nums);
log_num = file_nums.back();
for (auto* cfh : cfhs) {
delete cfh;
}
CloseDb();
}
CorruptFileWithTruncation(FileType::kWalFile, log_num,
1);
{
options_.avoid_flush_during_recovery = true;
std::vector<ColumnFamilyHandle*> cfhs;
ASSERT_OK(DB::Open(options_, dbname_, cf_descs, &cfhs, &db_));
assert(db_ != nullptr);
ASSERT_OK(db_->Put(WriteOptions(), cfhs[1], "k2", "v2"));
ASSERT_OK(db_->Flush(FlushOptions(), cfhs[1]));
for (auto* cfh : cfhs) {
delete cfh;
}
CloseDb();
}
for (int i = 0; i < 2; ++i) {
std::vector<ColumnFamilyHandle*> cfhs;
ASSERT_OK(DB::Open(options_, dbname_, cf_descs, &cfhs, &db_));
assert(db_ != nullptr);
for (auto* cfh : cfhs) {
delete cfh;
}
CloseDb();
}
}
TEST_F(CorruptionTest, RecoverWriteError) {
fs_->writable_file_error_ = true;
Status s = TryReopen();
ASSERT_TRUE(!s.ok());
}
TEST_F(CorruptionTest, NewFileErrorDuringWrite) {
fs_->writable_file_error_ = true;
const int num =
static_cast<int>(3 + (Options().write_buffer_size / kValueSize));
std::string value_storage;
Status s;
bool failed = false;
for (int i = 0; i < num; i++) {
WriteBatch batch;
ASSERT_OK(batch.Put("a", Value(100, &value_storage)));
s = db_->Write(WriteOptions(), &batch);
if (!s.ok()) {
failed = true;
}
ASSERT_TRUE(!failed || !s.ok());
}
ASSERT_TRUE(!s.ok());
ASSERT_GE(fs_->num_writable_file_errors_, 1);
fs_->writable_file_error_ = false;
Reopen();
}
TEST_F(CorruptionTest, TableFile) {
Build(100);
DBImpl* dbi = dbfull();
ASSERT_OK(dbi->TEST_FlushMemTable());
ASSERT_OK(dbi->TEST_CompactRange(0, nullptr, nullptr));
ASSERT_OK(dbi->TEST_CompactRange(1, nullptr, nullptr));
Corrupt(kTableFile, 100, 1);
Check(99, 99);
ASSERT_NOK(db_->VerifyChecksum());
}
TEST_F(CorruptionTest, VerifyChecksumReadahead) {
Options options;
options.level_compaction_dynamic_level_bytes = false;
SpecialEnv senv(base_env_);
options.env = &senv;
BlockBasedTableOptions table_options_no_bc;
table_options_no_bc.no_block_cache = true;
options.table_factory.reset(NewBlockBasedTableFactory(table_options_no_bc));
Reopen(&options);
Build(10000);
DBImpl* dbi = dbfull();
ASSERT_OK(dbi->TEST_FlushMemTable());
ASSERT_OK(dbi->TEST_CompactRange(0, nullptr, nullptr));
ASSERT_OK(dbi->TEST_CompactRange(1, nullptr, nullptr));
senv.count_random_reads_ = true;
senv.random_read_counter_.Reset();
ASSERT_OK(db_->VerifyChecksum());
ASSERT_GT(senv.random_read_counter_.Read(), 0);
ASSERT_LT(senv.random_read_counter_.Read(), 60);
senv.random_read_bytes_counter_ = 0;
ReadOptions ro;
ro.readahead_size = size_t{32 * 1024};
ASSERT_OK(db_->VerifyChecksum(ro));
ASSERT_GE(senv.random_read_counter_.Read(), 213);
ASSERT_LE(senv.random_read_counter_.Read(), 447);
options.allow_mmap_reads = true;
Reopen(&options);
ASSERT_OK(db_->VerifyChecksum(ro));
CloseDb();
}
TEST_F(CorruptionTest, TableFileIndexData) {
Options options;
options.level_compaction_dynamic_level_bytes = false;
options.write_buffer_size = 100 * 1024 * 1024;
Reopen(&options);
Build(10000, 5000);
ASSERT_OK(dbfull()->TEST_FlushMemTable());
Corrupt(kTableFile, -2000, 500);
options.paranoid_checks = false;
Reopen(&options);
Check(0, 5000, ReadOptions(true, true));
ASSERT_NOK(db_->VerifyChecksum());
ASSERT_TRUE(TryReopen().IsCorruption());
}
TEST_F(CorruptionTest, TableFileFooterMagic) {
Build(100);
ASSERT_OK(dbfull()->TEST_FlushMemTable());
Check(100, 100);
Corrupt(kTableFile, -100, 100);
Status s = TryReopen();
ASSERT_TRUE(s.IsCorruption());
ASSERT_TRUE(s.ToString().find("magic number") != std::string::npos);
ASSERT_TRUE(s.ToString().find(".sst") != std::string::npos);
}
TEST_F(CorruptionTest, TableFileFooterNotMagic) {
Build(100);
ASSERT_OK(dbfull()->TEST_FlushMemTable());
Check(100, 100);
Corrupt(kTableFile, -100, 92);
Status s = TryReopen();
ASSERT_TRUE(s.IsCorruption());
ASSERT_TRUE(s.ToString().find("format_version") != std::string::npos);
ASSERT_TRUE(s.ToString().find(".sst") != std::string::npos);
}
TEST_F(CorruptionTest, DBOpenWithWrongFileSize) {
CloseDb();
const std::string test_cf_name = "test_cf";
std::vector<ColumnFamilyDescriptor> cf_descs;
cf_descs.emplace_back(kDefaultColumnFamilyName, ColumnFamilyOptions());
cf_descs.emplace_back(test_cf_name, ColumnFamilyOptions());
{
options_.create_missing_column_families = true;
std::vector<ColumnFamilyHandle*> cfhs;
ASSERT_OK(DB::Open(options_, dbname_, cf_descs, &cfhs, &db_));
assert(db_ != nullptr);
ASSERT_OK(db_->Put(WriteOptions(), cfhs[0], "k", "v"));
ASSERT_OK(db_->Put(WriteOptions(), cfhs[1], "k1", "v1"));
ASSERT_OK(db_->Put(WriteOptions(), cfhs[0], "k2", "v2"));
for (auto* cfh : cfhs) {
delete cfh;
}
ASSERT_OK(dbfull()->TEST_FlushMemTable());
std::vector<LiveFileMetaData> metadata;
db_->GetLiveFilesMetaData(&metadata);
std::string filename = dbname_ + metadata[0].name;
const auto& fs = options_.env->GetFileSystem();
{
std::unique_ptr<FSWritableFile> f;
ASSERT_OK(fs->ReopenWritableFile(filename, FileOptions(), &f, nullptr));
ASSERT_OK(f->Append("blahblah", IOOptions(), nullptr));
ASSERT_OK(f->Close(IOOptions(), nullptr));
}
CloseDb();
}
options_.paranoid_checks = true;
std::vector<ColumnFamilyHandle*> cfhs;
Status s;
s = DB::Open(options_, dbname_, cf_descs, &cfhs, &db_);
ASSERT_TRUE(s.IsCorruption());
ASSERT_TRUE(s.ToString().find("file size mismatch") != std::string::npos);
options_.paranoid_checks = false;
ASSERT_OK(DB::Open(options_, dbname_, cf_descs, &cfhs, &db_));
assert(db_ != nullptr);
std::string v;
ASSERT_OK(db_->Get(ReadOptions(), cfhs[1], "k1", &v));
ASSERT_EQ(v, "v1");
Check(0, 0);
s = db_->Get(ReadOptions(), cfhs[0], "k1", &v);
ASSERT_TRUE(s.IsCorruption());
delete cfhs[1];
delete cfhs[0];
}
TEST_F(CorruptionTest, TableFileWrongSize) {
Build(100);
ASSERT_OK(dbfull()->TEST_FlushMemTable());
Check(100, 100);
std::vector<LiveFileMetaData> metadata;
db_->GetLiveFilesMetaData(&metadata);
ASSERT_EQ(1U, metadata.size());
std::string filename = dbname_ + metadata[0].name;
const auto& fs = options_.env->GetFileSystem();
{
std::unique_ptr<FSWritableFile> f;
ASSERT_OK(fs->ReopenWritableFile(filename, FileOptions(), &f, nullptr));
ASSERT_OK(f->Append("blahblah", IOOptions(), nullptr));
ASSERT_OK(f->Close(IOOptions(), nullptr));
}
options_.paranoid_checks = false;
Reopen();
Check(0, 0);
std::string v;
auto s = db_->Get(ReadOptions(), "k1", &v);
ASSERT_TRUE(s.IsCorruption());
options_.paranoid_checks = true;
s = TryReopen();
ASSERT_TRUE(s.IsCorruption());
ASSERT_TRUE(s.ToString().find("file size mismatch") != std::string::npos);
for (size_t bytes_lost : {8, 100}) {
ASSERT_OK(test::TruncateFile(env_.get(), filename,
metadata[0].size - bytes_lost));
options_.paranoid_checks = true;
s = TryReopen();
ASSERT_TRUE(s.IsCorruption());
ASSERT_TRUE(s.ToString().find("file size mismatch") != std::string::npos);
options_.paranoid_checks = false;
Reopen();
Check(0, 0); }
}
TEST_F(CorruptionTest, MissingDescriptor) {
Build(1000);
RepairDB();
Reopen();
Check(1000, 1000);
}
TEST_F(CorruptionTest, SequenceNumberRecovery) {
ASSERT_OK(db_->Put(WriteOptions(), "foo", "v1"));
ASSERT_OK(db_->Put(WriteOptions(), "foo", "v2"));
ASSERT_OK(db_->Put(WriteOptions(), "foo", "v3"));
ASSERT_OK(db_->Put(WriteOptions(), "foo", "v4"));
ASSERT_OK(db_->Put(WriteOptions(), "foo", "v5"));
RepairDB();
Reopen();
std::string v;
ASSERT_OK(db_->Get(ReadOptions(), "foo", &v));
ASSERT_EQ("v5", v);
ASSERT_OK(db_->Put(WriteOptions(), "foo", "v6"));
ASSERT_OK(db_->Get(ReadOptions(), "foo", &v));
ASSERT_EQ("v6", v);
Reopen();
ASSERT_OK(db_->Get(ReadOptions(), "foo", &v));
ASSERT_EQ("v6", v);
}
TEST_F(CorruptionTest, CorruptedDescriptor) {
ASSERT_OK(db_->Put(WriteOptions(), "foo", "hello"));
ASSERT_OK(dbfull()->TEST_FlushMemTable());
CompactRangeOptions cro;
cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
ASSERT_OK(
db_->CompactRange(cro, db_->DefaultColumnFamily(), nullptr, nullptr));
Corrupt(kDescriptorFile, 0, 1000);
Status s = TryReopen();
ASSERT_TRUE(!s.ok());
RepairDB();
Reopen();
std::string v;
ASSERT_OK(db_->Get(ReadOptions(), "foo", &v));
ASSERT_EQ("hello", v);
}
TEST_F(CorruptionTest, CompactionInputError) {
Options options;
options.level_compaction_dynamic_level_bytes = false;
options.env = env_.get();
Reopen(&options);
Build(10);
DBImpl* dbi = dbfull();
ASSERT_OK(dbi->TEST_FlushMemTable());
ASSERT_OK(dbi->TEST_CompactRange(0, nullptr, nullptr));
ASSERT_OK(dbi->TEST_CompactRange(1, nullptr, nullptr));
ASSERT_EQ(1, Property("rocksdb.num-files-at-level2"));
Corrupt(kTableFile, 100, 1);
Check(9, 9);
ASSERT_NOK(db_->VerifyChecksum());
Build(10000);
Check(10000, 10000);
ASSERT_NOK(db_->VerifyChecksum());
}
TEST_F(CorruptionTest, CompactionInputErrorParanoid) {
Options options;
options.level_compaction_dynamic_level_bytes = false;
options.env = env_.get();
options.paranoid_checks = true;
options.write_buffer_size = 131072;
options.max_write_buffer_number = 2;
Reopen(&options);
DBImpl* dbi = dbfull();
for (int level = 1; level < db_->NumberLevels(); level++) {
ASSERT_OK(db_->Put(WriteOptions(), "", "begin"));
ASSERT_OK(db_->Put(WriteOptions(), "~", "end"));
ASSERT_OK(dbi->TEST_FlushMemTable());
for (int comp_level = 0; comp_level < db_->NumberLevels() - level;
++comp_level) {
ASSERT_OK(dbi->TEST_CompactRange(comp_level, nullptr, nullptr));
}
}
Reopen(&options);
dbi = dbfull();
Build(10);
ASSERT_OK(dbi->TEST_FlushMemTable());
ASSERT_OK(dbi->TEST_WaitForCompact());
ASSERT_EQ(1, Property("rocksdb.num-files-at-level0"));
CorruptTableFileAtLevel(0, 100, 1);
Check(9, 9);
ASSERT_NOK(db_->VerifyChecksum());
Status s;
std::string tmp1, tmp2;
bool failed = false;
for (int i = 0; i < 10000; i++) {
s = db_->Put(WriteOptions(), Key(i, &tmp1), Value(i, &tmp2));
if (!s.ok()) {
failed = true;
}
ASSERT_TRUE(!failed || !s.ok()) << "write did not fail in a corrupted db";
}
ASSERT_TRUE(!s.ok()) << "write did not fail in corrupted paranoid db";
}
TEST_F(CorruptionTest, UnrelatedKeys) {
Build(10);
ASSERT_OK(dbfull()->TEST_FlushMemTable());
Corrupt(kTableFile, 100, 1);
ASSERT_NOK(db_->VerifyChecksum());
std::string tmp1, tmp2;
ASSERT_OK(db_->Put(WriteOptions(), Key(1000, &tmp1), Value(1000, &tmp2)));
std::string v;
ASSERT_OK(db_->Get(ReadOptions(), Key(1000, &tmp1), &v));
ASSERT_EQ(Value(1000, &tmp2).ToString(), v);
ASSERT_OK(dbfull()->TEST_FlushMemTable());
ASSERT_OK(db_->Get(ReadOptions(), Key(1000, &tmp1), &v));
ASSERT_EQ(Value(1000, &tmp2).ToString(), v);
}
TEST_F(CorruptionTest, RangeDeletionCorrupted) {
ASSERT_OK(
db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "a", "b"));
ASSERT_OK(db_->Flush(FlushOptions()));
std::vector<LiveFileMetaData> metadata;
db_->GetLiveFilesMetaData(&metadata);
ASSERT_EQ(static_cast<size_t>(1), metadata.size());
std::string filename = dbname_ + metadata[0].name;
FileOptions file_opts;
const auto& fs = options_.env->GetFileSystem();
std::unique_ptr<RandomAccessFileReader> file_reader;
ASSERT_OK(RandomAccessFileReader::Create(fs, filename, file_opts,
&file_reader, nullptr));
uint64_t file_size;
ASSERT_OK(
fs->GetFileSize(filename, file_opts.io_options, &file_size, nullptr));
BlockHandle range_del_handle;
const ReadOptions read_options;
ASSERT_OK(FindMetaBlockInFile(file_reader.get(), file_size,
kBlockBasedTableMagicNumber,
ImmutableOptions(options_), read_options,
kRangeDelBlockName, &range_del_handle));
ASSERT_OK(TryReopen());
ASSERT_OK(test::CorruptFile(env_.get(), filename,
static_cast<int>(range_del_handle.offset()), 1));
ASSERT_TRUE(TryReopen().IsCorruption());
}
TEST_F(CorruptionTest, FileSystemStateCorrupted) {
for (int iter = 0; iter < 2; ++iter) {
Options options;
options.level_compaction_dynamic_level_bytes = false;
options.env = env_.get();
options.paranoid_checks = true;
options.create_if_missing = true;
Reopen(&options);
Build(10);
ASSERT_OK(db_->Flush(FlushOptions()));
std::vector<LiveFileMetaData> metadata;
db_->GetLiveFilesMetaData(&metadata);
ASSERT_GT(metadata.size(), 0);
std::string filename = dbname_ + metadata[0].name;
db_.reset();
if (iter == 0) { std::unique_ptr<WritableFile> file;
ASSERT_OK(env_->NewWritableFile(filename, &file, EnvOptions()));
ASSERT_OK(file->Append(Slice("corrupted sst")));
file.reset();
Status x = TryReopen(&options);
ASSERT_TRUE(x.IsCorruption());
} else { ASSERT_OK(env_->DeleteFile(filename));
Status x = TryReopen(&options);
ASSERT_TRUE(x.IsCorruption());
}
ASSERT_OK(DestroyDB(dbname_, options_));
}
}
static const auto& corruption_modes = {
mock::MockTableFactory::kCorruptNone, mock::MockTableFactory::kCorruptKey,
mock::MockTableFactory::kCorruptValue,
mock::MockTableFactory::kCorruptReorderKey};
TEST_F(CorruptionTest, ParanoidFileChecksOnFlush) {
Options options;
options.level_compaction_dynamic_level_bytes = false;
options.env = env_.get();
options.paranoid_file_checks = true;
options.create_if_missing = true;
Status s;
for (const auto& mode : corruption_modes) {
db_.reset();
s = DestroyDB(dbname_, options);
ASSERT_OK(s);
std::shared_ptr<mock::MockTableFactory> mock =
std::make_shared<mock::MockTableFactory>();
options.table_factory = mock;
mock->SetCorruptionMode(mode);
ASSERT_OK(DB::Open(options, dbname_, &db_));
assert(db_ != nullptr); Build(10);
s = db_->Flush(FlushOptions());
if (mode == mock::MockTableFactory::kCorruptNone) {
ASSERT_OK(s);
} else {
ASSERT_NOK(s);
}
}
}
TEST_F(CorruptionTest, ParanoidFileChecksOnCompact) {
Options options;
options.level_compaction_dynamic_level_bytes = false;
options.env = env_.get();
options.paranoid_file_checks = true;
options.create_if_missing = true;
Status s;
for (const auto& mode : corruption_modes) {
db_.reset();
s = DestroyDB(dbname_, options);
ASSERT_OK(s);
std::shared_ptr<mock::MockTableFactory> mock =
std::make_shared<mock::MockTableFactory>();
options.table_factory = mock;
ASSERT_OK(DB::Open(options, dbname_, &db_));
assert(db_ != nullptr); Build(100, 2);
ASSERT_OK(dbfull()->TEST_FlushMemTable());
mock->SetCorruptionMode(mode);
CompactRangeOptions cro;
cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
s = db_->CompactRange(cro, db_->DefaultColumnFamily(), nullptr, nullptr);
if (mode == mock::MockTableFactory::kCorruptNone) {
ASSERT_OK(s);
} else {
ASSERT_NOK(s);
}
}
}
TEST_F(CorruptionTest, ParanoidFileChecksWithDeleteRangeFirst) {
Options options;
options.level_compaction_dynamic_level_bytes = false;
options.env = env_.get();
options.paranoid_file_checks = true;
options.create_if_missing = true;
for (bool do_flush : {true, false}) {
db_.reset();
ASSERT_OK(DestroyDB(dbname_, options));
ASSERT_OK(DB::Open(options, dbname_, &db_));
std::string start, end;
assert(db_ != nullptr); ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
Key(3, &start), Key(7, &end)));
auto snap = db_->GetSnapshot();
ASSERT_NE(snap, nullptr);
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
Key(8, &start), Key(9, &end)));
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
Key(2, &start), Key(5, &end)));
Build(10);
if (do_flush) {
ASSERT_OK(db_->Flush(FlushOptions()));
} else {
ASSERT_OK(dbfull()->TEST_FlushMemTable());
CompactRangeOptions cro;
cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
ASSERT_OK(
db_->CompactRange(cro, db_->DefaultColumnFamily(), nullptr, nullptr));
}
db_->ReleaseSnapshot(snap);
}
}
TEST_F(CorruptionTest, ParanoidFileChecksWithDeleteRange) {
Options options;
options.level_compaction_dynamic_level_bytes = false;
options.env = env_.get();
options.paranoid_file_checks = true;
options.create_if_missing = true;
for (bool do_flush : {true, false}) {
db_.reset();
ASSERT_OK(DestroyDB(dbname_, options));
ASSERT_OK(DB::Open(options, dbname_, &db_));
assert(db_ != nullptr); Build(10, 0, 0);
std::string start, end;
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
Key(5, &start), Key(15, &end)));
auto snap = db_->GetSnapshot();
ASSERT_NE(snap, nullptr);
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
Key(8, &start), Key(9, &end)));
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
Key(12, &start), Key(17, &end)));
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
Key(2, &start), Key(4, &end)));
Build(10, 10, 0);
if (do_flush) {
ASSERT_OK(db_->Flush(FlushOptions()));
} else {
ASSERT_OK(dbfull()->TEST_FlushMemTable());
CompactRangeOptions cro;
cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
ASSERT_OK(
db_->CompactRange(cro, db_->DefaultColumnFamily(), nullptr, nullptr));
}
db_->ReleaseSnapshot(snap);
}
}
TEST_F(CorruptionTest, ParanoidFileChecksWithDeleteRangeLast) {
Options options;
options.level_compaction_dynamic_level_bytes = false;
options.env = env_.get();
options.paranoid_file_checks = true;
options.create_if_missing = true;
for (bool do_flush : {true, false}) {
db_.reset();
ASSERT_OK(DestroyDB(dbname_, options));
ASSERT_OK(DB::Open(options, dbname_, &db_));
assert(db_ != nullptr); std::string start, end;
Build(10);
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
Key(3, &start), Key(7, &end)));
auto snap = db_->GetSnapshot();
ASSERT_NE(snap, nullptr);
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
Key(6, &start), Key(8, &end)));
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
Key(2, &start), Key(5, &end)));
if (do_flush) {
ASSERT_OK(db_->Flush(FlushOptions()));
} else {
ASSERT_OK(dbfull()->TEST_FlushMemTable());
CompactRangeOptions cro;
cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
ASSERT_OK(
db_->CompactRange(cro, db_->DefaultColumnFamily(), nullptr, nullptr));
}
db_->ReleaseSnapshot(snap);
}
}
TEST_F(CorruptionTest, LogCorruptionErrorsInCompactionIterator) {
Options options;
options.level_compaction_dynamic_level_bytes = false;
options.env = env_.get();
options.create_if_missing = true;
options.allow_data_in_errors = true;
auto mode = mock::MockTableFactory::kCorruptKey;
db_.reset();
ASSERT_OK(DestroyDB(dbname_, options));
std::shared_ptr<mock::MockTableFactory> mock =
std::make_shared<mock::MockTableFactory>();
mock->SetCorruptionMode(mode);
options.table_factory = mock;
ASSERT_OK(DB::Open(options, dbname_, &db_));
assert(db_ != nullptr); Build(100, 2);
ASSERT_OK(dbfull()->TEST_FlushMemTable());
CompactRangeOptions cro;
cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
Status s =
db_->CompactRange(cro, db_->DefaultColumnFamily(), nullptr, nullptr);
ASSERT_NOK(s);
ASSERT_TRUE(s.IsCorruption());
}
TEST_F(CorruptionTest, CompactionKeyOrderCheck) {
Options options;
options.level_compaction_dynamic_level_bytes = false;
options.env = env_.get();
options.paranoid_file_checks = false;
options.create_if_missing = true;
db_.reset();
ASSERT_OK(DestroyDB(dbname_, options));
std::shared_ptr<mock::MockTableFactory> mock =
std::make_shared<mock::MockTableFactory>();
options.table_factory = mock;
ASSERT_OK(DB::Open(options, dbname_, &db_));
assert(db_ != nullptr); mock->SetCorruptionMode(mock::MockTableFactory::kCorruptReorderKey);
Build(100, 2);
ASSERT_OK(dbfull()->TEST_FlushMemTable());
mock->SetCorruptionMode(mock::MockTableFactory::kCorruptNone);
CompactRangeOptions cro;
cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
ASSERT_NOK(
db_->CompactRange(cro, db_->DefaultColumnFamily(), nullptr, nullptr));
}
TEST_F(CorruptionTest, FlushKeyOrderCheck) {
Options options;
options.level_compaction_dynamic_level_bytes = false;
options.env = env_.get();
options.paranoid_file_checks = false;
options.create_if_missing = true;
ASSERT_OK(db_->Put(WriteOptions(), "foo1", "v1"));
ASSERT_OK(db_->Put(WriteOptions(), "foo2", "v1"));
ASSERT_OK(db_->Put(WriteOptions(), "foo3", "v1"));
ASSERT_OK(db_->Put(WriteOptions(), "foo4", "v1"));
int cnt = 0;
SyncPoint::GetInstance()->SetCallBack(
"MemTableIterator::Next:0", [&](void* arg) {
MemTableRep::Iterator* mem_iter =
static_cast<MemTableRep::Iterator*>(arg);
if (++cnt == 3) {
mem_iter->Prev();
mem_iter->Prev();
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
Status s = dbfull()->TEST_FlushMemTable();
ASSERT_NOK(s);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
}
TEST_F(CorruptionTest, VerifyWholeTableChecksum) {
CloseDb();
Options options;
options.level_compaction_dynamic_level_bytes = false;
options.env = env_.get();
ASSERT_OK(DestroyDB(dbname_, options));
options.create_if_missing = true;
options.file_checksum_gen_factory =
ROCKSDB_NAMESPACE::GetFileChecksumGenCrc32cFactory();
Reopen(&options);
Build(10, 5);
ASSERT_OK(db_->VerifyFileChecksums(ReadOptions()));
CloseDb();
Corrupt(kTableFile, 0, 1);
ASSERT_OK(TryReopen(&options));
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
int count{0};
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::VerifyFullFileChecksum:mismatch", [&](void* arg) {
auto* s = static_cast<Status*>(arg);
ASSERT_NE(s, nullptr);
++count;
ASSERT_NOK(*s);
});
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_TRUE(db_->VerifyFileChecksums(ReadOptions()).IsCorruption());
ASSERT_EQ(1, count);
}
class CrashDuringRecoveryWithCorruptionTest
: public CorruptionTest,
public testing::WithParamInterface<std::tuple<bool, bool>> {
public:
explicit CrashDuringRecoveryWithCorruptionTest()
: CorruptionTest(),
avoid_flush_during_recovery_(std::get<0>(GetParam())),
track_and_verify_wals_in_manifest_(std::get<1>(GetParam())) {}
protected:
const bool avoid_flush_during_recovery_;
const bool track_and_verify_wals_in_manifest_;
};
INSTANTIATE_TEST_CASE_P(CorruptionTest, CrashDuringRecoveryWithCorruptionTest,
::testing::Values(std::make_tuple(true, false),
std::make_tuple(false, false),
std::make_tuple(true, true),
std::make_tuple(false, true)));
TEST_P(CrashDuringRecoveryWithCorruptionTest, CrashDuringRecovery) {
CloseDb();
Options options;
options.level_compaction_dynamic_level_bytes = false;
options.track_and_verify_wals_in_manifest =
track_and_verify_wals_in_manifest_;
options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
options.avoid_flush_during_recovery = false;
options.env = env_.get();
ASSERT_OK(DestroyDB(dbname_, options));
options.create_if_missing = true;
options.max_write_buffer_number = 8;
Reopen(&options);
Status s;
const std::string test_cf_name = "test_cf";
ColumnFamilyHandle* cfh = nullptr;
s = db_->CreateColumnFamily(options, test_cf_name, &cfh);
ASSERT_OK(s);
delete cfh;
CloseDb();
std::vector<ColumnFamilyDescriptor> cf_descs;
cf_descs.emplace_back(kDefaultColumnFamilyName, options);
cf_descs.emplace_back(test_cf_name, options);
std::vector<ColumnFamilyHandle*> handles;
{
ASSERT_OK(DB::Open(options, dbname_, cf_descs, &handles, &db_));
auto* dbimpl = dbfull();
assert(dbimpl);
ASSERT_OK(db_->Put(WriteOptions(), handles[1], "old_key", "dontcare"));
ASSERT_OK(db_->Flush(FlushOptions(), handles[1]));
for (int i = 0; i < 2; ++i) {
ASSERT_OK(db_->Put(WriteOptions(), "key" + std::to_string(i),
"value" + std::to_string(i)));
ASSERT_OK(dbimpl->TEST_SwitchMemtable());
}
for (auto* h : handles) {
delete h;
}
handles.clear();
CloseDb();
}
{
std::vector<uint64_t> file_nums;
GetSortedWalFiles(file_nums);
size_t size = file_nums.size();
assert(size >= 2);
uint64_t log_num = file_nums[size - 2];
CorruptFileWithTruncation(FileType::kWalFile, log_num,
8);
}
{
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::GetLogSizeAndMaybeTruncate:0", [&](void* arg) {
auto* tmp_s = static_cast<Status*>(arg);
assert(tmp_s);
*tmp_s = Status::IOError("Injected");
});
SyncPoint::GetInstance()->EnableProcessing();
handles.clear();
options.avoid_flush_during_recovery = true;
s = DB::Open(options, dbname_, cf_descs, &handles, &db_);
ASSERT_TRUE(s.IsIOError());
ASSERT_EQ("IO error: Injected", s.ToString());
for (auto* h : handles) {
delete h;
}
CloseDb();
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}
{
options.avoid_flush_during_recovery = avoid_flush_during_recovery_;
ASSERT_OK(DB::Open(options, dbname_, cf_descs, &handles, &db_));
{
std::string v;
ASSERT_OK(db_->Get(ReadOptions(), handles[1], "old_key", &v));
ASSERT_EQ("dontcare", v);
v.clear();
ASSERT_OK(db_->Get(ReadOptions(), "key" + std::to_string(0), &v));
ASSERT_EQ("value" + std::to_string(0), v);
v.clear();
ASSERT_EQ(db_->Get(ReadOptions(), "key" + std::to_string(1), &v),
Status::NotFound());
}
for (auto* h : handles) {
delete h;
}
handles.clear();
CloseDb();
}
}
TEST_P(CrashDuringRecoveryWithCorruptionTest, TxnDbCrashDuringRecovery) {
CloseDb();
Options options;
options.level_compaction_dynamic_level_bytes = false;
options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
options.track_and_verify_wals_in_manifest =
track_and_verify_wals_in_manifest_;
options.avoid_flush_during_recovery = false;
options.env = env_.get();
ASSERT_OK(DestroyDB(dbname_, options));
options.create_if_missing = true;
options.max_write_buffer_number = 3;
Reopen(&options);
ColumnFamilyHandle* cfh = nullptr;
const std::string test_cf_name = "test_cf";
Status s = db_->CreateColumnFamily(options, test_cf_name, &cfh);
ASSERT_OK(s);
delete cfh;
CloseDb();
std::vector<ColumnFamilyDescriptor> cf_descs;
cf_descs.emplace_back(kDefaultColumnFamilyName, options);
cf_descs.emplace_back(test_cf_name, options);
std::vector<ColumnFamilyHandle*> handles;
TransactionDB* txn_db = nullptr;
TransactionDBOptions txn_db_opts;
{
ASSERT_OK(TransactionDB::Open(options, txn_db_opts, dbname_, cf_descs,
&handles, &txn_db));
auto* txn = txn_db->BeginTransaction(WriteOptions(), TransactionOptions());
ASSERT_OK(txn->Put(handles[1], "foo", "value"));
ASSERT_OK(txn->SetName("txn0"));
ASSERT_OK(txn->Prepare());
ASSERT_OK(txn_db->Flush(FlushOptions()));
delete txn;
txn = nullptr;
auto* dbimpl = static_cast_with_check<DBImpl>(txn_db->GetRootDB());
assert(dbimpl);
for (int i = 0; i < 2; ++i) {
ASSERT_OK(txn_db->Put(WriteOptions(), "key" + std::to_string(i),
"value" + std::to_string(i)));
ASSERT_OK(dbimpl->TEST_SwitchMemtable());
}
txn = txn_db->BeginTransaction(WriteOptions(), TransactionOptions());
ASSERT_OK(txn->Put(handles[1], "foo1", "value1"));
ASSERT_OK(txn->Commit());
delete txn;
txn = nullptr;
for (auto* h : handles) {
delete h;
}
handles.clear();
delete txn_db;
}
{
std::vector<uint64_t> file_nums;
GetSortedWalFiles(file_nums);
size_t size = file_nums.size();
assert(size >= 2);
uint64_t log_num = file_nums[size - 2];
CorruptFileWithTruncation(FileType::kWalFile, log_num,
8);
}
{
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::Open::BeforeSyncWAL", [&](void* arg) {
auto* tmp_s = static_cast<Status*>(arg);
assert(tmp_s);
*tmp_s = Status::IOError("Injected");
});
SyncPoint::GetInstance()->EnableProcessing();
handles.clear();
s = TransactionDB::Open(options, txn_db_opts, dbname_, cf_descs, &handles,
&txn_db);
ASSERT_TRUE(s.IsIOError());
ASSERT_EQ("IO error: Injected", s.ToString());
for (auto* h : handles) {
delete h;
}
CloseDb();
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}
{
std::vector<uint64_t> file_nums;
GetSortedWalFiles(file_nums);
size_t size = file_nums.size();
uint64_t log_num = file_nums[size - 1];
CorruptFileWithTruncation(FileType::kWalFile, log_num);
}
{
ASSERT_OK(TransactionDB::Open(options, txn_db_opts, dbname_, cf_descs,
&handles, &txn_db));
{
std::string v;
ASSERT_EQ(txn_db->Get(ReadOptions(), handles[1], "foo", &v),
Status::NotFound());
v.clear();
ASSERT_OK(txn_db->Get(ReadOptions(), "key" + std::to_string(0), &v));
ASSERT_EQ("value" + std::to_string(0), v);
v.clear();
ASSERT_EQ(txn_db->Get(ReadOptions(), "key" + std::to_string(1), &v),
Status::NotFound());
v.clear();
ASSERT_EQ(txn_db->Get(ReadOptions(), handles[1], "foo1", &v),
Status::NotFound());
}
for (auto* h : handles) {
delete h;
}
delete txn_db;
}
}
TEST_P(CrashDuringRecoveryWithCorruptionTest, CrashDuringRecoveryWithFlush) {
CloseDb();
Options options;
options.level_compaction_dynamic_level_bytes = false;
options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
options.avoid_flush_during_recovery = false;
options.env = env_.get();
options.create_if_missing = true;
ASSERT_OK(DestroyDB(dbname_, options));
Reopen(&options);
ColumnFamilyHandle* cfh = nullptr;
const std::string test_cf_name = "test_cf";
Status s = db_->CreateColumnFamily(options, test_cf_name, &cfh);
ASSERT_OK(s);
delete cfh;
CloseDb();
std::vector<ColumnFamilyDescriptor> cf_descs;
cf_descs.emplace_back(kDefaultColumnFamilyName, options);
cf_descs.emplace_back(test_cf_name, options);
std::vector<ColumnFamilyHandle*> handles;
{
ASSERT_OK(DB::Open(options, dbname_, cf_descs, &handles, &db_));
ASSERT_OK(db_->Put(WriteOptions(), handles[1], "old_key", "dontcare"));
for (int i = 0; i < 2; ++i) {
ASSERT_OK(db_->Put(WriteOptions(), "key" + std::to_string(i),
"value" + std::to_string(i)));
ASSERT_OK(db_->Flush(FlushOptions()));
}
ASSERT_OK(db_->Put(WriteOptions(), handles[1], "dontcare", "dontcare"));
for (auto* h : handles) {
delete h;
}
handles.clear();
CloseDb();
}
{
std::vector<uint64_t> file_nums;
GetSortedWalFiles(file_nums);
size_t size = file_nums.size();
uint64_t log_num = file_nums[size - 1];
CorruptFileWithTruncation(FileType::kWalFile, log_num,
8);
}
{
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::GetLogSizeAndMaybeTruncate:0", [&](void* arg) {
auto* tmp_s = static_cast<Status*>(arg);
assert(tmp_s);
*tmp_s = Status::IOError("Injected");
});
SyncPoint::GetInstance()->EnableProcessing();
handles.clear();
options.avoid_flush_during_recovery = true;
s = DB::Open(options, dbname_, cf_descs, &handles, &db_);
ASSERT_TRUE(s.IsIOError());
ASSERT_EQ("IO error: Injected", s.ToString());
for (auto* h : handles) {
delete h;
}
CloseDb();
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}
{
options.avoid_flush_during_recovery = avoid_flush_during_recovery_;
ASSERT_OK(DB::Open(options, dbname_, cf_descs, &handles, &db_));
{
std::string v;
ASSERT_OK(db_->Get(ReadOptions(), handles[1], "old_key", &v));
ASSERT_EQ("dontcare", v);
for (int i = 0; i < 2; ++i) {
v.clear();
ASSERT_OK(db_->Get(ReadOptions(), "key" + std::to_string(i), &v));
ASSERT_EQ("value" + std::to_string(i), v);
}
v.clear();
ASSERT_EQ(db_->Get(ReadOptions(), handles[1], "dontcare", &v),
Status::NotFound());
}
for (auto* h : handles) {
delete h;
}
}
}
}
int main(int argc, char** argv) {
ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
RegisterCustomObjects(argc, argv);
return RUN_ALL_TESTS();
}