#include <cstring>
#include "db/db_test_util.h"
#include "options/options_helper.h"
#include "port/stack_trace.h"
#include "rocksdb/filter_policy.h"
#include "rocksdb/flush_block_policy.h"
#include "rocksdb/merge_operator.h"
#include "rocksdb/perf_context.h"
#include "rocksdb/table.h"
#include "rocksdb/utilities/debug.h"
#include "table/block_based/block_based_table_reader.h"
#include "table/block_based/block_builder.h"
#include "test_util/sync_point.h"
#include "util/file_checksum_helper.h"
#include "util/random.h"
#include "utilities/counted_fs.h"
#include "utilities/fault_injection_env.h"
#include "utilities/fault_injection_fs.h"
#include "utilities/merge_operators.h"
#include "utilities/merge_operators/string_append/stringappend.h"
namespace ROCKSDB_NAMESPACE {
namespace {
class MyFlushBlockPolicy : public FlushBlockPolicy {
public:
explicit MyFlushBlockPolicy(const int num_keys_in_block,
const BlockBuilder& data_block_builder)
: num_keys_in_block_(num_keys_in_block),
num_keys_(0),
data_block_builder_(data_block_builder) {}
bool Update(const Slice& , const Slice& ) override {
if (data_block_builder_.empty()) {
num_keys_ = 1;
return false;
}
if (num_keys_ == num_keys_in_block_) {
num_keys_ = 1;
return true;
}
num_keys_++;
return false;
}
private:
const int num_keys_in_block_;
int num_keys_;
const BlockBuilder& data_block_builder_;
};
class MyFlushBlockPolicyFactory : public FlushBlockPolicyFactory {
public:
explicit MyFlushBlockPolicyFactory(const int num_keys_in_block)
: num_keys_in_block_(num_keys_in_block) {}
virtual const char* Name() const override {
return "MyFlushBlockPolicyFactory";
}
virtual FlushBlockPolicy* NewFlushBlockPolicy(
const BlockBasedTableOptions& ,
const BlockBuilder& data_block_builder) const override {
return new MyFlushBlockPolicy(num_keys_in_block_, data_block_builder);
}
private:
const int num_keys_in_block_;
};
}
static bool enable_io_uring = true;
extern "C" bool RocksDbIOUringEnable() { return enable_io_uring; }
class DBBasicTest : public DBTestBase {
public:
DBBasicTest() : DBTestBase("db_basic_test", false) {}
};
TEST_F(DBBasicTest, OpenWhenOpen) {
Options options = CurrentOptions();
options.env = env_;
std::unique_ptr<DB> db2;
Status s = DB::Open(options, dbname_, &db2);
ASSERT_NOK(s) << [&db2]() {
db2.reset();
return "db2 open: ok";
}();
ASSERT_EQ(Status::Code::kIOError, s.code());
ASSERT_EQ(Status::SubCode::kNone, s.subcode());
ASSERT_TRUE(strstr(s.getState(), "lock ") != nullptr);
}
TEST_F(DBBasicTest, EnableDirectIOWithZeroBuf) {
if (!IsDirectIOSupported()) {
ROCKSDB_GTEST_BYPASS("Direct IO not supported");
return;
}
Options options = GetDefaultOptions();
options.create_if_missing = true;
options.use_direct_io_for_flush_and_compaction = true;
options.writable_file_max_buffer_size = 0;
ASSERT_TRUE(TryReopen(options).IsInvalidArgument());
options.writable_file_max_buffer_size = 1024;
Reopen(options);
const std::unordered_map<std::string, std::string> new_db_opts = {
{"writable_file_max_buffer_size", "0"}};
ASSERT_TRUE(db_->SetDBOptions(new_db_opts).IsInvalidArgument());
}
TEST_F(DBBasicTest, UniqueSession) {
Options options = CurrentOptions();
std::string sid1, sid2, sid3, sid4;
ASSERT_OK(db_->GetDbSessionId(sid1));
Reopen(options);
ASSERT_OK(db_->GetDbSessionId(sid2));
ASSERT_OK(Put("foo", "v1"));
ASSERT_OK(db_->GetDbSessionId(sid4));
Reopen(options);
ASSERT_OK(db_->GetDbSessionId(sid3));
ASSERT_NE(sid1, sid2);
ASSERT_NE(sid1, sid3);
ASSERT_NE(sid2, sid3);
ASSERT_EQ(sid2, sid4);
TestRegex expected("[0-9A-Z]{20}");
EXPECT_MATCHES_REGEX(sid1, expected);
EXPECT_MATCHES_REGEX(sid2, expected);
EXPECT_MATCHES_REGEX(sid3, expected);
Close();
ASSERT_OK(ReadOnlyReopen(options));
ASSERT_OK(db_->GetDbSessionId(sid1));
ASSERT_NE(sid1, sid3);
Close();
ASSERT_OK(ReadOnlyReopen(options));
ASSERT_OK(db_->GetDbSessionId(sid2));
ASSERT_EQ("v1", Get("foo"));
ASSERT_OK(db_->GetDbSessionId(sid3));
ASSERT_NE(sid1, sid2);
ASSERT_EQ(sid2, sid3);
DestroyAndReopen(options);
CreateAndReopenWithCF({"goku"}, options);
ASSERT_OK(db_->GetDbSessionId(sid1));
ASSERT_OK(Put("bar", "e1"));
ASSERT_OK(db_->GetDbSessionId(sid2));
ASSERT_EQ("e1", Get("bar"));
ASSERT_OK(db_->GetDbSessionId(sid3));
ReopenWithColumnFamilies({"default", "goku"}, options);
ASSERT_OK(db_->GetDbSessionId(sid4));
ASSERT_EQ(sid1, sid2);
ASSERT_EQ(sid2, sid3);
ASSERT_NE(sid1, sid4);
}
TEST_F(DBBasicTest, ReadOnlyDB) {
ASSERT_OK(Put("foo", "v1"));
ASSERT_OK(Put("bar", "v2"));
ASSERT_OK(Flush());
ASSERT_OK(Put("foo", "v3"));
Close();
auto verify_one_iter = [&](Iterator* iter) {
int count = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ASSERT_OK(iter->status());
++count;
}
ASSERT_OK(iter->status());
ASSERT_EQ(count, 2);
};
auto verify_all_iters = [&]() {
Iterator* iter = db_->NewIterator(ReadOptions());
verify_one_iter(iter);
delete iter;
std::vector<Iterator*> iters;
ASSERT_OK(db_->NewIterators(ReadOptions(),
{dbfull()->DefaultColumnFamily()}, &iters));
ASSERT_EQ(static_cast<uint64_t>(1), iters.size());
verify_one_iter(iters[0]);
delete iters[0];
};
auto options = CurrentOptions();
assert(options.env == env_);
ASSERT_OK(EnforcedReadOnlyReopen(options));
ASSERT_EQ("v3", Get("foo"));
ASSERT_EQ("v2", Get("bar"));
verify_all_iters();
ASSERT_EQ(Flush().code(), Status::Code::kNotSupported);
Close();
Reopen(options);
ASSERT_OK(Flush());
Close();
ASSERT_OK(EnforcedReadOnlyReopen(options));
ASSERT_EQ("v3", Get("foo"));
ASSERT_EQ("v2", Get("bar"));
verify_all_iters();
ASSERT_EQ(db_->SyncWAL().code(), Status::Code::kNotSupported);
std::vector<ColumnFamilyHandle*> cfhs{{}};
ASSERT_EQ(db_->CreateColumnFamily(options, "blah", &cfhs[0]).code(),
Status::Code::kNotSupported);
ASSERT_EQ(db_->CreateColumnFamilies(options, {"blah"}, &cfhs).code(),
Status::Code::kNotSupported);
std::vector<ColumnFamilyDescriptor> cfds;
cfds.push_back({"blah", options});
ASSERT_EQ(db_->CreateColumnFamilies(cfds, &cfhs).code(),
Status::Code::kNotSupported);
}
TEST_F(DBBasicTest, ReadOnlyDBFlushWAL) {
auto options = CurrentOptions();
options.manual_wal_flush = true;
DestroyAndReopen(options);
ASSERT_OK(Put("foo", "v1"));
ASSERT_OK(Put("bar", "v2"));
ASSERT_OK(Flush());
ASSERT_OK(Put("baz", "v3")); Close();
ASSERT_OK(ReadOnlyReopen(options));
ASSERT_EQ("v1", Get("foo"));
ASSERT_EQ("v2", Get("bar"));
ASSERT_EQ("v3", Get("baz"));
ASSERT_EQ(db_->FlushWAL(false).code(), Status::Code::kNotSupported);
ASSERT_EQ(db_->FlushWAL(true).code(), Status::Code::kNotSupported);
LiveFilesStorageInfoOptions lfsi_opts;
lfsi_opts.wal_size_for_flush = 0;
std::vector<LiveFileStorageInfo> files;
ASSERT_OK(db_->GetLiveFilesStorageInfo(lfsi_opts, &files));
ASSERT_GT(files.size(), 0);
Close();
}
TEST_F(DBBasicTest, ReadOnlyDBWithWriteDBIdToManifestSet) {
auto options = CurrentOptions();
options.write_dbid_to_manifest = false;
DestroyAndReopen(options);
ASSERT_OK(Put("foo", "v1"));
ASSERT_OK(Put("bar", "v2"));
ASSERT_OK(Put("foo", "v3"));
Close();
options.write_dbid_to_manifest = true;
assert(options.env == env_);
ASSERT_OK(EnforcedReadOnlyReopen(options));
std::string db_id1;
ASSERT_OK(db_->GetDbIdentity(db_id1));
ASSERT_EQ("v3", Get("foo"));
ASSERT_EQ("v2", Get("bar"));
Iterator* iter = db_->NewIterator(ReadOptions());
int count = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ASSERT_OK(iter->status());
++count;
}
ASSERT_EQ(count, 2);
delete iter;
Close();
Reopen(options);
ASSERT_OK(Flush());
Close();
ASSERT_OK(EnforcedReadOnlyReopen(options));
ASSERT_EQ("v3", Get("foo"));
ASSERT_EQ("v2", Get("bar"));
ASSERT_TRUE(db_->SyncWAL().IsNotSupported());
std::string db_id2;
ASSERT_OK(db_->GetDbIdentity(db_id2));
ASSERT_EQ(db_id1, db_id2);
}
TEST_F(DBBasicTest, CompactedDB) {
const uint64_t kFileSize = 1 << 20;
Options options = CurrentOptions();
options.disable_auto_compactions = true;
options.write_buffer_size = kFileSize;
options.target_file_size_base = kFileSize;
options.max_bytes_for_level_base = 1 << 30;
options.compression = kNoCompression;
Reopen(options);
ASSERT_OK(Put("aaa", DummyString(kFileSize / 2, '1')));
ASSERT_OK(Flush());
Close();
ASSERT_OK(ReadOnlyReopen(options));
Status s = Put("new", "value");
ASSERT_EQ(s.ToString(),
"Not implemented: Not supported operation in read only mode.");
ASSERT_EQ(DummyString(kFileSize / 2, '1'), Get("aaa"));
Close();
options.max_open_files = -1;
ASSERT_OK(ReadOnlyReopen(options));
s = Put("new", "value");
ASSERT_EQ(s.ToString(),
"Not implemented: Not supported in compacted db mode.");
ASSERT_EQ(DummyString(kFileSize / 2, '1'), Get("aaa"));
Close();
Reopen(options);
ASSERT_OK(Put("bbb", DummyString(kFileSize / 2, '2')));
ASSERT_OK(Flush());
ASSERT_OK(Put("aaa", DummyString(kFileSize / 2, 'a')));
ASSERT_OK(Flush());
ASSERT_OK(Put("bbb", DummyString(kFileSize / 2, 'b')));
ASSERT_OK(Put("eee", DummyString(kFileSize / 2, 'e')));
ASSERT_OK(Flush());
ASSERT_OK(Put("something_not_flushed", "x"));
Close();
ASSERT_OK(ReadOnlyReopen(options));
s = Put("new", "value");
ASSERT_EQ(s.ToString(),
"Not implemented: Not supported operation in read only mode.");
std::vector<std::string> files;
uint64_t manifest_file_size;
ASSERT_OK(db_->GetLiveFiles(files, &manifest_file_size, true));
LiveFilesStorageInfoOptions lfsi_opts;
lfsi_opts.wal_size_for_flush = 0; std::vector<LiveFileStorageInfo> files2;
ASSERT_OK(db_->GetLiveFilesStorageInfo(lfsi_opts, &files2));
Close();
Reopen(options);
ASSERT_OK(Put("fff", DummyString(kFileSize / 2, 'f')));
ASSERT_OK(Put("hhh", DummyString(kFileSize / 2, 'h')));
ASSERT_OK(Put("iii", DummyString(kFileSize / 2, 'i')));
ASSERT_OK(Put("jjj", DummyString(kFileSize / 2, 'j')));
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_EQ(3, NumTableFilesAtLevel(1));
Close();
ASSERT_OK(ReadOnlyReopen(options));
s = Put("new", "value");
ASSERT_EQ(s.ToString(),
"Not implemented: Not supported in compacted db mode.");
ASSERT_EQ("NOT_FOUND", Get("abc"));
ASSERT_EQ(DummyString(kFileSize / 2, 'a'), Get("aaa"));
ASSERT_EQ(DummyString(kFileSize / 2, 'b'), Get("bbb"));
ASSERT_EQ("NOT_FOUND", Get("ccc"));
ASSERT_EQ(DummyString(kFileSize / 2, 'e'), Get("eee"));
ASSERT_EQ(DummyString(kFileSize / 2, 'f'), Get("fff"));
ASSERT_EQ("NOT_FOUND", Get("ggg"));
ASSERT_EQ(DummyString(kFileSize / 2, 'h'), Get("hhh"));
ASSERT_EQ(DummyString(kFileSize / 2, 'i'), Get("iii"));
ASSERT_EQ(DummyString(kFileSize / 2, 'j'), Get("jjj"));
ASSERT_EQ("NOT_FOUND", Get("kkk"));
ASSERT_OK(db_->GetLiveFiles(files, &manifest_file_size, true));
ASSERT_OK(db_->GetLiveFilesStorageInfo(lfsi_opts, &files2));
std::vector<std::string> values;
std::vector<Status> status_list = dbfull()->MultiGet(
ReadOptions(),
std::vector<Slice>({Slice("aaa"), Slice("ccc"), Slice("eee"),
Slice("ggg"), Slice("iii"), Slice("kkk")}),
&values);
ASSERT_EQ(status_list.size(), static_cast<uint64_t>(6));
ASSERT_EQ(values.size(), static_cast<uint64_t>(6));
ASSERT_OK(status_list[0]);
ASSERT_EQ(DummyString(kFileSize / 2, 'a'), values[0]);
ASSERT_TRUE(status_list[1].IsNotFound());
ASSERT_OK(status_list[2]);
ASSERT_EQ(DummyString(kFileSize / 2, 'e'), values[2]);
ASSERT_TRUE(status_list[3].IsNotFound());
ASSERT_OK(status_list[4]);
ASSERT_EQ(DummyString(kFileSize / 2, 'i'), values[4]);
ASSERT_TRUE(status_list[5].IsNotFound());
Reopen(options);
ASSERT_OK(Put("fff", DummyString(kFileSize / 2, 'f')));
Close();
ASSERT_OK(ReadOnlyReopen(options));
s = Put("new", "value");
ASSERT_EQ(s.ToString(),
"Not implemented: Not supported operation in read only mode.");
}
TEST_F(DBBasicTest, LevelLimitReopen) {
Options options = CurrentOptions();
CreateAndReopenWithCF({"pikachu"}, options);
const std::string value(1024 * 1024, ' ');
int i = 0;
while (NumTableFilesAtLevel(2, 1) == 0) {
ASSERT_OK(Put(1, Key(i++), value));
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
}
options.num_levels = 1;
options.max_bytes_for_level_multiplier_additional.resize(1, 1);
Status s = TryReopenWithColumnFamilies({"default", "pikachu"}, options);
ASSERT_EQ(s.IsInvalidArgument(), true);
ASSERT_EQ(s.ToString(),
"Invalid argument: db has more levels than options.num_levels");
options.num_levels = 10;
options.max_bytes_for_level_multiplier_additional.resize(10, 1);
ASSERT_OK(TryReopenWithColumnFamilies({"default", "pikachu"}, options));
}
TEST_F(DBBasicTest, PutDeleteGet) {
do {
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
ASSERT_OK(Put(1, "foo", "v1"));
ASSERT_EQ("v1", Get(1, "foo"));
ASSERT_OK(Put(1, "foo", "v2"));
ASSERT_EQ("v2", Get(1, "foo"));
ASSERT_OK(Delete(1, "foo"));
ASSERT_EQ("NOT_FOUND", Get(1, "foo"));
} while (ChangeOptions());
}
TEST_F(DBBasicTest, PutSingleDeleteGet) {
do {
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
ASSERT_OK(Put(1, "foo", "v1"));
ASSERT_EQ("v1", Get(1, "foo"));
ASSERT_OK(Put(1, "foo2", "v2"));
ASSERT_EQ("v2", Get(1, "foo2"));
ASSERT_OK(SingleDelete(1, "foo"));
ASSERT_EQ("NOT_FOUND", Get(1, "foo"));
} while (ChangeOptions(kSkipFIFOCompaction | kSkipUniversalCompaction |
kSkipMergePut));
}
TEST_F(DBBasicTest, TimedPutBasic) {
do {
Options options = CurrentOptions();
options.merge_operator = MergeOperators::CreateStringAppendOperator();
CreateAndReopenWithCF({"pikachu"}, options);
ASSERT_OK(TimedPut(1, "foo", "v1", 0));
ASSERT_EQ("v1", Get(1, "foo"));
ASSERT_OK(TimedPut(1, "foo", "v2.1", 3));
ASSERT_EQ("v2.1", Get(1, "foo"));
ASSERT_OK(db_->Flush(FlushOptions(), handles_[1]));
ASSERT_OK(Merge(1, "foo", "v2.2"));
ASSERT_EQ("v2.1,v2.2", Get(1, "foo"));
ASSERT_OK(Delete(1, "foo"));
ASSERT_EQ("NOT_FOUND", Get(1, "foo"));
ASSERT_OK(TimedPut(1, "bar", "bv1", 0));
ASSERT_EQ("bv1", Get(1, "bar"));
ASSERT_OK(TimedPut(1, "baz", "bzv1", 0));
ASSERT_EQ("bzv1", Get(1, "baz"));
if (option_config_ != kRowCache) {
std::string range_del_begin = "b";
std::string range_del_end = "baz";
Slice begin_rdel = range_del_begin, end_rdel = range_del_end;
ASSERT_OK(
db_->DeleteRange(WriteOptions(), handles_[1], begin_rdel, end_rdel));
ASSERT_EQ("NOT_FOUND", Get(1, "bar"));
}
ASSERT_EQ("bzv1", Get(1, "baz"));
ASSERT_OK(SingleDelete(1, "baz"));
ASSERT_EQ("NOT_FOUND", Get(1, "baz"));
} while (ChangeOptions(kSkipPlainTable));
}
TEST_F(DBBasicTest, EmptyFlush) {
do {
Random rnd(301);
Options options = CurrentOptions();
options.disable_auto_compactions = true;
CreateAndReopenWithCF({"pikachu"}, options);
ASSERT_OK(Put(1, "a", Slice()));
ASSERT_OK(SingleDelete(1, "a"));
ASSERT_OK(Flush(1));
ASSERT_EQ("[ ]", AllEntriesFor("a", 1));
} while (ChangeOptions(kSkipFIFOCompaction | kSkipUniversalCompaction |
kSkipMergePut));
}
TEST_F(DBBasicTest, GetFromVersions) {
do {
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
ASSERT_OK(Put(1, "foo", "v1"));
ASSERT_OK(Flush(1));
ASSERT_EQ("v1", Get(1, "foo"));
ASSERT_EQ("NOT_FOUND", Get(0, "foo"));
} while (ChangeOptions());
}
TEST_F(DBBasicTest, GetSnapshot) {
anon::OptionsOverride options_override;
options_override.skip_policy = kSkipNoSnapshot;
do {
CreateAndReopenWithCF({"pikachu"}, CurrentOptions(options_override));
for (int i = 0; i < 2; i++) {
std::string key = (i == 0) ? std::string("foo") : std::string(200, 'x');
ASSERT_OK(Put(1, key, "v1"));
const Snapshot* s1 = db_->GetSnapshot();
ASSERT_OK(Put(1, key, "v2"));
ASSERT_EQ("v2", Get(1, key));
ASSERT_EQ("v1", Get(1, key, s1));
ASSERT_OK(Flush(1));
ASSERT_EQ("v2", Get(1, key));
ASSERT_EQ("v1", Get(1, key, s1));
db_->ReleaseSnapshot(s1);
}
} while (ChangeOptions());
}
TEST_F(DBBasicTest, CheckLock) {
do {
std::unique_ptr<DB> localdb;
Options options = CurrentOptions();
ASSERT_OK(TryReopen(options));
Status s = DB::Open(options, dbname_, &localdb);
ASSERT_NOK(s) << [&localdb]() {
localdb.reset();
return "localdb open: ok";
}();
#ifdef OS_LINUX
ASSERT_TRUE(s.ToString().find("lock ") != std::string::npos);
#endif } while (ChangeCompactOptions());
}
TEST_F(DBBasicTest, FlushMultipleMemtable) {
do {
Options options = CurrentOptions();
WriteOptions writeOpt = WriteOptions();
writeOpt.disableWAL = true;
options.max_write_buffer_number = 4;
options.min_write_buffer_number_to_merge = 3;
options.max_write_buffer_size_to_maintain = -1;
CreateAndReopenWithCF({"pikachu"}, options);
ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v1"));
ASSERT_OK(Flush(1));
ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v1"));
ASSERT_EQ("v1", Get(1, "foo"));
ASSERT_EQ("v1", Get(1, "bar"));
ASSERT_OK(Flush(1));
} while (ChangeCompactOptions());
}
TEST_F(DBBasicTest, FlushEmptyColumnFamily) {
env_->SetBackgroundThreads(1, Env::HIGH);
env_->SetBackgroundThreads(1, Env::LOW);
test::SleepingBackgroundTask sleeping_task_low;
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
Env::Priority::LOW);
test::SleepingBackgroundTask sleeping_task_high;
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
&sleeping_task_high, Env::Priority::HIGH);
Options options = CurrentOptions();
options.disable_auto_compactions = true;
WriteOptions writeOpt = WriteOptions();
writeOpt.disableWAL = true;
options.max_write_buffer_number = 2;
options.min_write_buffer_number_to_merge = 1;
options.max_write_buffer_size_to_maintain =
static_cast<int64_t>(options.write_buffer_size);
CreateAndReopenWithCF({"pikachu"}, options);
ASSERT_OK(Flush(0));
ASSERT_OK(Flush(1));
ASSERT_OK(dbfull()->Put(writeOpt, handles_[0], "foo", "v1"));
ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v1"));
ASSERT_EQ("v1", Get(0, "foo"));
ASSERT_EQ("v1", Get(1, "bar"));
sleeping_task_high.WakeUp();
sleeping_task_high.WaitUntilDone();
ASSERT_OK(Flush(0));
ASSERT_OK(Flush(1));
sleeping_task_low.WakeUp();
sleeping_task_low.WaitUntilDone();
}
TEST_F(DBBasicTest, Flush) {
do {
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
WriteOptions writeOpt = WriteOptions();
writeOpt.disableWAL = true;
SetPerfLevel(kEnableTime);
ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v1"));
ASSERT_OK(Flush(1));
ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v1"));
get_perf_context()->Reset();
Get(1, "foo");
ASSERT_TRUE((int)get_perf_context()->get_from_output_files_time > 0);
ASSERT_EQ(2, (int)get_perf_context()->get_read_bytes);
ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
ASSERT_EQ("v1", Get(1, "foo"));
ASSERT_EQ("v1", Get(1, "bar"));
writeOpt.disableWAL = true;
ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v2"));
ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v2"));
ASSERT_OK(Flush(1));
ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
ASSERT_EQ("v2", Get(1, "bar"));
get_perf_context()->Reset();
ASSERT_EQ("v2", Get(1, "foo"));
ASSERT_TRUE((int)get_perf_context()->get_from_output_files_time > 0);
writeOpt.disableWAL = false;
ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v3"));
ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v3"));
ASSERT_OK(Flush(1));
ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
ASSERT_EQ("v3", Get(1, "foo"));
ASSERT_EQ("v3", Get(1, "bar"));
SetPerfLevel(kDisable);
} while (ChangeCompactOptions());
}
TEST_F(DBBasicTest, IdentityAcrossRestarts) {
constexpr size_t kMinIdSize = 10;
do {
for (bool with_manifest : {false, true}) {
for (bool write_file : {false, true}) {
std::string idfilename = IdentityFileName(dbname_);
std::string id1, tmp;
ASSERT_OK(db_->GetDbIdentity(id1));
ASSERT_GE(id1.size(), kMinIdSize);
Options options = CurrentOptions();
options.write_dbid_to_manifest = with_manifest;
options.write_identity_file = true; Reopen(options);
std::string id2;
ASSERT_OK(db_->GetDbIdentity(id2));
ASSERT_EQ(id1, id2);
ASSERT_OK(ReadFileToString(env_, idfilename, &tmp));
ASSERT_EQ(tmp, id2);
if (write_file) {
ASSERT_OK(env_->DeleteFile(idfilename));
} else {
options.write_identity_file = false;
if (!with_manifest) {
ASSERT_NOK(TryReopen(options));
options.write_identity_file = true;
Reopen(options);
continue;
}
}
Reopen(options);
std::string id3;
ASSERT_OK(db_->GetDbIdentity(id3));
if (with_manifest) {
ASSERT_EQ(id1, id3);
} else {
ASSERT_NE(id1, id3);
ASSERT_GE(id3.size(), kMinIdSize);
}
if (write_file) {
ASSERT_OK(ReadFileToString(env_, idfilename, &tmp));
ASSERT_EQ(tmp, id3);
std::unique_ptr<WritableFile> w;
ASSERT_OK(env_->NewWritableFile(idfilename, &w, EnvOptions()));
ASSERT_OK(w->Close());
} else {
ASSERT_TRUE(env_->FileExists(idfilename).IsNotFound());
}
Reopen(options);
std::string id4;
ASSERT_OK(db_->GetDbIdentity(id4));
if (with_manifest) {
ASSERT_EQ(id1, id4);
} else {
ASSERT_NE(id1, id4);
ASSERT_GE(id4.size(), kMinIdSize);
}
std::string silly_id = "asdf123456789";
if (write_file) {
ASSERT_OK(ReadFileToString(env_, idfilename, &tmp));
ASSERT_EQ(tmp, id4);
std::unique_ptr<WritableFile> w;
ASSERT_OK(env_->NewWritableFile(idfilename, &w, EnvOptions()));
ASSERT_OK(w->Append(silly_id));
ASSERT_OK(w->Close());
} else {
ASSERT_TRUE(env_->FileExists(idfilename).IsNotFound());
}
Reopen(options);
std::string id5;
ASSERT_OK(db_->GetDbIdentity(id5));
if (with_manifest) {
ASSERT_EQ(id1, id5);
} else {
ASSERT_EQ(id5, silly_id);
}
if (write_file) {
ASSERT_OK(ReadFileToString(env_, idfilename, &tmp));
ASSERT_EQ(tmp, id5);
} else {
ASSERT_TRUE(env_->FileExists(idfilename).IsNotFound());
}
}
}
} while (ChangeCompactOptions());
}
TEST_F(DBBasicTest, LockFileRecovery) {
Options options = CurrentOptions();
for (bool ber : {false, true}) {
options.best_efforts_recovery = ber;
DestroyAndReopen(options);
std::string id1, id2;
ASSERT_OK(db_->GetDbIdentity(id1));
Close();
std::string lockfilename = LockFileName(dbname_);
ASSERT_OK(env_->DeleteFile(lockfilename));
Reopen(options);
ASSERT_OK(db_->GetDbIdentity(id2));
ASSERT_EQ(id1, id2);
}
}
TEST_F(DBBasicTest, Snapshot) {
env_->SetMockSleep();
anon::OptionsOverride options_override;
options_override.skip_policy = kSkipNoSnapshot;
do {
CreateAndReopenWithCF({"pikachu"}, CurrentOptions(options_override));
ASSERT_OK(Put(0, "foo", "0v1"));
ASSERT_OK(Put(1, "foo", "1v1"));
const Snapshot* s1 = db_->GetSnapshot();
ASSERT_EQ(1U, GetNumSnapshots());
uint64_t time_snap1 = GetTimeOldestSnapshots();
ASSERT_GT(time_snap1, 0U);
ASSERT_EQ(GetSequenceOldestSnapshots(), s1->GetSequenceNumber());
ASSERT_OK(Put(0, "foo", "0v2"));
ASSERT_OK(Put(1, "foo", "1v2"));
env_->MockSleepForSeconds(1);
const Snapshot* s2 = db_->GetSnapshot();
ASSERT_EQ(2U, GetNumSnapshots());
ASSERT_EQ(time_snap1, GetTimeOldestSnapshots());
ASSERT_EQ(GetSequenceOldestSnapshots(), s1->GetSequenceNumber());
ASSERT_OK(Put(0, "foo", "0v3"));
ASSERT_OK(Put(1, "foo", "1v3"));
{
ManagedSnapshot s3(db_.get());
ASSERT_EQ(3U, GetNumSnapshots());
ASSERT_EQ(time_snap1, GetTimeOldestSnapshots());
ASSERT_EQ(GetSequenceOldestSnapshots(), s1->GetSequenceNumber());
ASSERT_OK(Put(0, "foo", "0v4"));
ASSERT_OK(Put(1, "foo", "1v4"));
ASSERT_EQ("0v1", Get(0, "foo", s1));
ASSERT_EQ("1v1", Get(1, "foo", s1));
ASSERT_EQ("0v2", Get(0, "foo", s2));
ASSERT_EQ("1v2", Get(1, "foo", s2));
ASSERT_EQ("0v3", Get(0, "foo", s3.snapshot()));
ASSERT_EQ("1v3", Get(1, "foo", s3.snapshot()));
ASSERT_EQ("0v4", Get(0, "foo"));
ASSERT_EQ("1v4", Get(1, "foo"));
}
ASSERT_EQ(2U, GetNumSnapshots());
ASSERT_EQ(time_snap1, GetTimeOldestSnapshots());
ASSERT_EQ(GetSequenceOldestSnapshots(), s1->GetSequenceNumber());
ASSERT_EQ("0v1", Get(0, "foo", s1));
ASSERT_EQ("1v1", Get(1, "foo", s1));
ASSERT_EQ("0v2", Get(0, "foo", s2));
ASSERT_EQ("1v2", Get(1, "foo", s2));
ASSERT_EQ("0v4", Get(0, "foo"));
ASSERT_EQ("1v4", Get(1, "foo"));
db_->ReleaseSnapshot(s1);
ASSERT_EQ("0v2", Get(0, "foo", s2));
ASSERT_EQ("1v2", Get(1, "foo", s2));
ASSERT_EQ("0v4", Get(0, "foo"));
ASSERT_EQ("1v4", Get(1, "foo"));
ASSERT_EQ(1U, GetNumSnapshots());
ASSERT_LT(time_snap1, GetTimeOldestSnapshots());
ASSERT_EQ(GetSequenceOldestSnapshots(), s2->GetSequenceNumber());
db_->ReleaseSnapshot(s2);
ASSERT_EQ(0U, GetNumSnapshots());
ASSERT_EQ(GetSequenceOldestSnapshots(), 0);
ASSERT_EQ("0v4", Get(0, "foo"));
ASSERT_EQ("1v4", Get(1, "foo"));
} while (ChangeOptions());
}
class DBBasicMultiConfigs : public DBBasicTest,
public ::testing::WithParamInterface<int> {
public:
DBBasicMultiConfigs() { option_config_ = GetParam(); }
static std::vector<int> GenerateOptionConfigs() {
std::vector<int> option_configs;
for (int option_config = kDefault; option_config < kEnd; ++option_config) {
if (!ShouldSkipOptions(option_config, kSkipFIFOCompaction)) {
option_configs.push_back(option_config);
}
}
return option_configs;
}
};
TEST_P(DBBasicMultiConfigs, CompactBetweenSnapshots) {
anon::OptionsOverride options_override;
options_override.skip_policy = kSkipNoSnapshot;
Options options = CurrentOptions(options_override);
options.disable_auto_compactions = true;
DestroyAndReopen(options);
CreateAndReopenWithCF({"pikachu"}, options);
Random rnd(301);
FillLevels("a", "z", 1);
ASSERT_OK(Put(1, "foo", "first"));
const Snapshot* snapshot1 = db_->GetSnapshot();
ASSERT_OK(Put(1, "foo", "second"));
ASSERT_OK(Put(1, "foo", "third"));
ASSERT_OK(Put(1, "foo", "fourth"));
const Snapshot* snapshot2 = db_->GetSnapshot();
ASSERT_OK(Put(1, "foo", "fifth"));
ASSERT_OK(Put(1, "foo", "sixth"));
ASSERT_EQ(AllEntriesFor("foo", 1),
"[ sixth, fifth, fourth, third, second, first ]");
ASSERT_EQ("sixth", Get(1, "foo"));
ASSERT_EQ("fourth", Get(1, "foo", snapshot2));
ASSERT_EQ("first", Get(1, "foo", snapshot1));
ASSERT_OK(Flush(1));
ASSERT_EQ(AllEntriesFor("foo", 1), "[ sixth, fourth, first ]");
db_->ReleaseSnapshot(snapshot1);
FillLevels("a", "z", 1);
ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
nullptr));
ASSERT_EQ("sixth", Get(1, "foo"));
ASSERT_EQ("fourth", Get(1, "foo", snapshot2));
ASSERT_EQ(AllEntriesFor("foo", 1), "[ sixth, fourth ]");
db_->ReleaseSnapshot(snapshot2);
FillLevels("a", "z", 1);
ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
nullptr));
ASSERT_EQ("sixth", Get(1, "foo"));
ASSERT_EQ(AllEntriesFor("foo", 1), "[ sixth ]");
}
INSTANTIATE_TEST_CASE_P(
DBBasicMultiConfigs, DBBasicMultiConfigs,
::testing::ValuesIn(DBBasicMultiConfigs::GenerateOptionConfigs()));
TEST_F(DBBasicTest, DBOpen_Options) {
Options options = CurrentOptions();
Close();
Destroy(options);
std::unique_ptr<DB> db;
options.create_if_missing = false;
Status s = DB::Open(options, dbname_, &db);
ASSERT_TRUE(strstr(s.ToString().c_str(), "does not exist") != nullptr);
ASSERT_TRUE(db == nullptr);
options.create_if_missing = true;
s = DB::Open(options, dbname_, &db);
ASSERT_OK(s);
ASSERT_TRUE(db != nullptr);
db.reset();
options.create_if_missing = false;
options.error_if_exists = true;
s = DB::Open(options, dbname_, &db);
ASSERT_TRUE(strstr(s.ToString().c_str(), "exists") != nullptr);
ASSERT_TRUE(db == nullptr);
options.create_if_missing = true;
options.error_if_exists = false;
s = DB::Open(options, dbname_, &db);
ASSERT_OK(s);
ASSERT_TRUE(db != nullptr);
db.reset();
}
TEST_F(DBBasicTest, CompactOnFlush) {
anon::OptionsOverride options_override;
options_override.skip_policy = kSkipNoSnapshot;
do {
Options options = CurrentOptions(options_override);
options.disable_auto_compactions = true;
CreateAndReopenWithCF({"pikachu"}, options);
ASSERT_OK(Put(1, "foo", "v1"));
ASSERT_OK(Flush(1));
ASSERT_EQ(AllEntriesFor("foo", 1), "[ v1 ]");
ASSERT_OK(Put(1, "a", "begin"));
ASSERT_OK(Put(1, "z", "end"));
ASSERT_OK(Flush(1));
ASSERT_OK(Delete(1, "foo"));
ASSERT_OK(Put(1, "foo", "v2"));
ASSERT_EQ(AllEntriesFor("foo", 1), "[ v2, DEL, v1 ]");
ASSERT_OK(Flush(1));
ASSERT_EQ(AllEntriesFor("foo", 1), "[ v2, v1 ]");
ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), handles_[1],
nullptr, nullptr));
ASSERT_EQ(AllEntriesFor("foo", 1), "[ v2 ]");
ASSERT_OK(Delete(1, "foo"));
ASSERT_OK(Delete(1, "foo"));
ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL, DEL, v2 ]");
ASSERT_OK(Flush(1));
ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL, v2 ]");
ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), handles_[1],
nullptr, nullptr));
ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]");
ASSERT_OK(Put(1, "foo", "v3"));
ASSERT_OK(Delete(1, "foo"));
ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL, v3 ]");
ASSERT_OK(Flush(1));
ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL ]");
ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), handles_[1],
nullptr, nullptr));
ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]");
ASSERT_OK(Put(1, "foo", "v4"));
ASSERT_OK(Put(1, "foo", "v5"));
ASSERT_EQ(AllEntriesFor("foo", 1), "[ v5, v4 ]");
ASSERT_OK(Flush(1));
ASSERT_EQ(AllEntriesFor("foo", 1), "[ v5 ]");
ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), handles_[1],
nullptr, nullptr));
ASSERT_EQ(AllEntriesFor("foo", 1), "[ v5 ]");
ASSERT_OK(Delete(1, "foo"));
ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), handles_[1],
nullptr, nullptr));
ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]");
ASSERT_OK(Put(1, "foo", "v6"));
const Snapshot* snapshot = db_->GetSnapshot();
ASSERT_OK(Put(1, "foo", "v7"));
ASSERT_OK(Flush(1));
ASSERT_EQ(AllEntriesFor("foo", 1), "[ v7, v6 ]");
db_->ReleaseSnapshot(snapshot);
ASSERT_OK(Delete(1, "foo"));
ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), handles_[1],
nullptr, nullptr));
ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]");
const Snapshot* snapshot1 = db_->GetSnapshot();
ASSERT_OK(Put(1, "foo", "v8"));
ASSERT_OK(Put(1, "foo", "v9"));
ASSERT_OK(Flush(1));
ASSERT_EQ(AllEntriesFor("foo", 1), "[ v9 ]");
db_->ReleaseSnapshot(snapshot1);
} while (ChangeCompactOptions());
}
TEST_F(DBBasicTest, FlushOneColumnFamily) {
Options options = CurrentOptions();
CreateAndReopenWithCF({"pikachu", "ilya", "muromec", "dobrynia", "nikitich",
"alyosha", "popovich"},
options);
ASSERT_OK(Put(0, "Default", "Default"));
ASSERT_OK(Put(1, "pikachu", "pikachu"));
ASSERT_OK(Put(2, "ilya", "ilya"));
ASSERT_OK(Put(3, "muromec", "muromec"));
ASSERT_OK(Put(4, "dobrynia", "dobrynia"));
ASSERT_OK(Put(5, "nikitich", "nikitich"));
ASSERT_OK(Put(6, "alyosha", "alyosha"));
ASSERT_OK(Put(7, "popovich", "popovich"));
for (int i = 0; i < 8; ++i) {
ASSERT_OK(Flush(i));
auto tables = ListTableFiles(env_, dbname_);
ASSERT_EQ(tables.size(), i + 1U);
}
}
TEST_F(DBBasicTest, MultiGetSimple) {
do {
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
SetPerfLevel(kEnableCount);
ASSERT_OK(Put(1, "k1", "v1"));
ASSERT_OK(Put(1, "k2", "v2"));
ASSERT_OK(Put(1, "k3", "v3"));
ASSERT_OK(Put(1, "k4", "v4"));
ASSERT_OK(Delete(1, "k4"));
ASSERT_OK(Put(1, "k5", "v5"));
ASSERT_OK(Delete(1, "no_key"));
std::vector<Slice> keys({"k1", "k2", "k3", "k4", "k5", "no_key"});
std::vector<std::string> values(20, "Temporary data to be overwritten");
std::vector<ColumnFamilyHandle*> cfs(keys.size(), handles_[1]);
get_perf_context()->Reset();
std::vector<Status> s = db_->MultiGet(ReadOptions(), cfs, keys, &values);
ASSERT_EQ(values.size(), keys.size());
ASSERT_EQ(values[0], "v1");
ASSERT_EQ(values[1], "v2");
ASSERT_EQ(values[2], "v3");
ASSERT_EQ(values[4], "v5");
ASSERT_EQ(8, (int)get_perf_context()->multiget_read_bytes);
ASSERT_OK(s[0]);
ASSERT_OK(s[1]);
ASSERT_OK(s[2]);
ASSERT_TRUE(s[3].IsNotFound());
ASSERT_OK(s[4]);
ASSERT_TRUE(s[5].IsNotFound());
SetPerfLevel(kDisable);
} while (ChangeCompactOptions());
}
TEST_F(DBBasicTest, MultiGetEmpty) {
do {
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
std::vector<Slice> keys;
std::vector<std::string> values;
std::vector<ColumnFamilyHandle*> cfs;
std::vector<Status> s = db_->MultiGet(ReadOptions(), cfs, keys, &values);
ASSERT_EQ(s.size(), 0U);
Options options = CurrentOptions();
options.create_if_missing = true;
DestroyAndReopen(options);
CreateAndReopenWithCF({"pikachu"}, options);
s = db_->MultiGet(ReadOptions(), cfs, keys, &values);
ASSERT_EQ(s.size(), 0U);
keys.resize(2);
keys[0] = "a";
keys[1] = "b";
cfs.push_back(handles_[0]);
cfs.push_back(handles_[1]);
s = db_->MultiGet(ReadOptions(), cfs, keys, &values);
ASSERT_EQ(static_cast<int>(s.size()), 2);
ASSERT_TRUE(s[0].IsNotFound() && s[1].IsNotFound());
} while (ChangeCompactOptions());
}
class DBBlockChecksumTest : public DBBasicTest,
public testing::WithParamInterface<uint32_t> {};
INSTANTIATE_TEST_CASE_P(FormatVersions, DBBlockChecksumTest,
testing::ValuesIn(test::kFooterFormatVersionsToTest));
TEST_P(DBBlockChecksumTest, BlockChecksumTest) {
BlockBasedTableOptions table_options;
table_options.format_version = GetParam();
Options options = CurrentOptions();
const int kNumPerFile = 2;
const auto algs = GetSupportedChecksums();
const int algs_size = static_cast<int>(algs.size());
for (int i = 0; i < algs_size; ++i) {
table_options.checksum = algs[i];
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
Reopen(options);
for (int j = 0; j < kNumPerFile; ++j) {
ASSERT_OK(Put(Key(i * kNumPerFile + j), Key(i * kNumPerFile + j)));
}
ASSERT_OK(Flush());
}
for (int i = 0; i < algs_size; ++i) {
table_options.checksum = algs[i];
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
Reopen(options);
for (int j = 0; j < algs_size * kNumPerFile; ++j) {
ASSERT_EQ(Key(j), Get(Key(j)));
}
}
table_options.checksum = static_cast<ChecksumType>(123);
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
ASSERT_TRUE(TryReopen(options).IsInvalidArgument());
}
#ifndef OS_WIN
TEST_F(DBBasicTest, MmapAndBufferOptions) {
if (!IsMemoryMappedAccessSupported()) {
return;
}
Options options = CurrentOptions();
options.use_direct_reads = true;
options.allow_mmap_reads = true;
ASSERT_NOK(TryReopen(options));
options.use_direct_reads = false;
ASSERT_OK(TryReopen(options));
if (IsDirectIOSupported()) {
options.use_direct_reads = true;
options.allow_mmap_reads = false;
ASSERT_OK(TryReopen(options));
}
options.use_direct_reads = false;
ASSERT_OK(TryReopen(options));
}
#endif
class TestEnv : public EnvWrapper {
public:
explicit TestEnv(Env* base_env) : EnvWrapper(base_env), close_count(0) {}
static const char* kClassName() { return "TestEnv"; }
const char* Name() const override { return kClassName(); }
class TestLogger : public Logger {
public:
using Logger::Logv;
explicit TestLogger(TestEnv* env_ptr) : Logger() { env = env_ptr; }
~TestLogger() override {
if (!closed_) {
CloseHelper().PermitUncheckedError();
}
}
void Logv(const char* , va_list ) override {}
protected:
Status CloseImpl() override { return CloseHelper(); }
private:
Status CloseHelper() {
env->CloseCountInc();
;
return Status::IOError();
}
TestEnv* env;
};
void CloseCountInc() { close_count++; }
int GetCloseCount() { return close_count; }
Status NewLogger(const std::string& ,
std::shared_ptr<Logger>* result) override {
result->reset(new TestLogger(this));
return Status::OK();
}
private:
int close_count;
};
TEST_F(DBBasicTest, DBClose) {
Options options = GetDefaultOptions();
std::string dbname = test::PerThreadDBPath("db_close_test");
ASSERT_OK(DestroyDB(dbname, options));
std::unique_ptr<DB> db;
TestEnv* env = new TestEnv(env_);
std::unique_ptr<TestEnv> local_env_guard(env);
options.create_if_missing = true;
options.env = env;
Status s = DB::Open(options, dbname, &db);
ASSERT_OK(s);
ASSERT_TRUE(db != nullptr);
s = db->Close();
ASSERT_EQ(env->GetCloseCount(), 1);
ASSERT_EQ(s, Status::IOError());
db.reset();
ASSERT_EQ(env->GetCloseCount(), 1);
s = DB::Open(options, dbname, &db);
ASSERT_OK(s);
ASSERT_TRUE(db != nullptr);
db.reset();
ASSERT_EQ(env->GetCloseCount(), 2);
options.create_if_missing = false;
s = DB::Open(options, dbname, &db);
ASSERT_OK(s);
ASSERT_TRUE(db != nullptr);
WaitForCompactOptions wait_for_compact_options = WaitForCompactOptions();
wait_for_compact_options.close_db = true;
s = db->WaitForCompact(wait_for_compact_options);
ASSERT_EQ(env->GetCloseCount(), 3);
ASSERT_EQ(s, Status::IOError());
db.reset();
ASSERT_EQ(env->GetCloseCount(), 3);
options.info_log.reset(new TestEnv::TestLogger(env));
s = DB::Open(options, dbname, &db);
ASSERT_OK(s);
ASSERT_TRUE(db != nullptr);
s = db->Close();
ASSERT_EQ(s, Status::OK());
db.reset();
ASSERT_EQ(env->GetCloseCount(), 3);
options.info_log.reset();
ASSERT_EQ(env->GetCloseCount(), 4);
}
TEST_F(DBBasicTest, DBCloseAllDirectoryFDs) {
Options options = GetDefaultOptions();
std::string dbname = test::PerThreadDBPath("db_close_all_dir_fds_test");
options.wal_dir = dbname + "_wal_dir";
options.db_paths.emplace_back(dbname + "_1", 512 * 1024);
options.db_paths.emplace_back(dbname + "_2", 4 * 1024 * 1024);
options.db_paths.emplace_back(dbname + "_3", 1024 * 1024 * 1024);
ASSERT_OK(DestroyDB(dbname, options));
std::unique_ptr<DB> db;
std::unique_ptr<Env> env = NewCompositeEnv(
std::make_shared<CountedFileSystem>(FileSystem::Default()));
options.create_if_missing = true;
options.env = env.get();
Status s = DB::Open(options, dbname, &db);
ASSERT_OK(s);
ASSERT_TRUE(db != nullptr);
s = db->Close();
auto* counted_fs =
options.env->GetFileSystem()->CheckedCast<CountedFileSystem>();
ASSERT_TRUE(counted_fs != nullptr);
ASSERT_EQ(counted_fs->counters()->dir_opens,
counted_fs->counters()->dir_closes);
ASSERT_OK(s);
db.reset();
}
TEST_F(DBBasicTest, DBCloseFlushError) {
std::unique_ptr<FaultInjectionTestEnv> fault_injection_env(
new FaultInjectionTestEnv(env_));
Options options = GetDefaultOptions();
options.create_if_missing = true;
options.manual_wal_flush = true;
options.write_buffer_size = 100;
options.env = fault_injection_env.get();
Reopen(options);
ASSERT_OK(Put("key1", "value1"));
ASSERT_OK(Put("key2", "value2"));
ASSERT_OK(dbfull()->TEST_SwitchMemtable());
ASSERT_OK(Put("key3", "value3"));
fault_injection_env->SetFilesystemActive(false);
Status s = dbfull()->Close();
ASSERT_NE(s, Status::OK());
s = dbfull()->Close();
ASSERT_NE(s, Status::OK());
fault_injection_env->SetFilesystemActive(true);
s = dbfull()->Close();
ASSERT_NE(s, Status::OK());
Destroy(options);
}
class DBMultiGetTestWithParam
: public DBBasicTest,
public testing::WithParamInterface<std::tuple<bool, bool>> {};
TEST_P(DBMultiGetTestWithParam, MultiGetMultiCF) {
#ifndef USE_COROUTINES
if (std::get<1>(GetParam())) {
ROCKSDB_GTEST_SKIP("This test requires coroutine support");
return;
}
#endif Options options = CurrentOptions();
CreateAndReopenWithCF({"pikachu", "ilya", "muromec", "dobrynia", "nikitich",
"alyosha", "popovich"},
options);
std::vector<std::tuple<int, std::string, std::string>> cf_kv_vec;
static const int num_keys = 24;
cf_kv_vec.reserve(num_keys);
for (int i = 0; i < num_keys; ++i) {
int cf = i / 3;
int cf_key = 1 % 3;
cf_kv_vec.emplace_back(
cf, "cf" + std::to_string(cf) + "_key_" + std::to_string(cf_key),
"cf" + std::to_string(cf) + "_val_" + std::to_string(cf_key));
ASSERT_OK(Put(std::get<0>(cf_kv_vec[i]), std::get<1>(cf_kv_vec[i]),
std::get<2>(cf_kv_vec[i])));
}
int get_sv_count = 0;
ROCKSDB_NAMESPACE::DBImpl* db = dbfull();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::MultiCFSnapshot::AfterRefSV", [&](void* ) {
if (++get_sv_count == 2) {
for (int i = 0; i < num_keys; ++i) {
int cf = i / 3;
int cf_key = i % 8;
if (cf_key == 0) {
ASSERT_OK(Flush(cf));
}
ASSERT_OK(Put(std::get<0>(cf_kv_vec[i]), std::get<1>(cf_kv_vec[i]),
std::get<2>(cf_kv_vec[i]) + "_2"));
}
}
if (get_sv_count == 11) {
for (int i = 0; i < 8; ++i) {
auto* cfd = static_cast_with_check<ColumnFamilyHandleImpl>(
db->GetColumnFamilyHandle(i))
->cfd();
ASSERT_EQ(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse);
}
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
std::vector<int> cfs;
std::vector<std::string> keys;
std::vector<std::string> values;
for (int i = 0; i < num_keys; ++i) {
cfs.push_back(std::get<0>(cf_kv_vec[i]));
keys.push_back(std::get<1>(cf_kv_vec[i]));
}
values = MultiGet(cfs, keys, nullptr, std::get<0>(GetParam()),
std::get<1>(GetParam()));
ASSERT_EQ(values.size(), num_keys);
for (unsigned int j = 0; j < values.size(); ++j) {
ASSERT_EQ(values[j], std::get<2>(cf_kv_vec[j]) + "_2");
}
keys.clear();
cfs.clear();
cfs.push_back(std::get<0>(cf_kv_vec[0]));
keys.push_back(std::get<1>(cf_kv_vec[0]));
cfs.push_back(std::get<0>(cf_kv_vec[3]));
keys.push_back(std::get<1>(cf_kv_vec[3]));
cfs.push_back(std::get<0>(cf_kv_vec[4]));
keys.push_back(std::get<1>(cf_kv_vec[4]));
values = MultiGet(cfs, keys, nullptr, std::get<0>(GetParam()),
std::get<1>(GetParam()));
ASSERT_EQ(values[0], std::get<2>(cf_kv_vec[0]) + "_2");
ASSERT_EQ(values[1], std::get<2>(cf_kv_vec[3]) + "_2");
ASSERT_EQ(values[2], std::get<2>(cf_kv_vec[4]) + "_2");
keys.clear();
cfs.clear();
cfs.push_back(std::get<0>(cf_kv_vec[7]));
keys.push_back(std::get<1>(cf_kv_vec[7]));
cfs.push_back(std::get<0>(cf_kv_vec[6]));
keys.push_back(std::get<1>(cf_kv_vec[6]));
cfs.push_back(std::get<0>(cf_kv_vec[1]));
keys.push_back(std::get<1>(cf_kv_vec[1]));
values = MultiGet(cfs, keys, nullptr, std::get<0>(GetParam()),
std::get<1>(GetParam()));
ASSERT_EQ(values[0], std::get<2>(cf_kv_vec[7]) + "_2");
ASSERT_EQ(values[1], std::get<2>(cf_kv_vec[6]) + "_2");
ASSERT_EQ(values[2], std::get<2>(cf_kv_vec[1]) + "_2");
for (int cf = 0; cf < 8; ++cf) {
auto* cfd = static_cast_with_check<ColumnFamilyHandleImpl>(
dbfull()->GetColumnFamilyHandle(cf))
->cfd();
ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse);
ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVObsolete);
}
}
TEST_P(DBMultiGetTestWithParam, MultiGetMultiCFMutex) {
#ifndef USE_COROUTINES
if (std::get<1>(GetParam())) {
ROCKSDB_GTEST_SKIP("This test requires coroutine support");
return;
}
#endif Options options = CurrentOptions();
CreateAndReopenWithCF({"pikachu", "ilya", "muromec", "dobrynia", "nikitich",
"alyosha", "popovich"},
options);
for (int i = 0; i < 8; ++i) {
ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key",
"cf" + std::to_string(i) + "_val"));
}
int get_sv_count = 0;
int retries = 0;
bool last_try = false;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::MultiCFSnapshot::LastTry",
[&](void* ) { last_try = true; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::MultiCFSnapshot::AfterRefSV", [&](void* ) {
if (last_try) {
return;
}
if (++get_sv_count == 2) {
++retries;
get_sv_count = 0;
for (int i = 0; i < 8; ++i) {
ASSERT_OK(Flush(i));
ASSERT_OK(Put(
i, "cf" + std::to_string(i) + "_key",
"cf" + std::to_string(i) + "_val" + std::to_string(retries)));
}
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
{"DBImpl::MultiCFSnapshot::AfterLastTryRefSV",
"DBMultiGetTestWithParam::MultiGetMultiCFMutex:BeforeCreateSV"},
{"DBMultiGetTestWithParam::MultiGetMultiCFMutex:AfterCreateSV",
"DBImpl::MultiCFSnapshot::BeforeLastTryUnRefSV"},
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
port::Thread create_sv_thread([this]() {
TEST_SYNC_POINT(
"DBMultiGetTestWithParam::MultiGetMultiCFMutex:BeforeCreateSV");
for (int i = 0; i < 8; ++i) {
ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key",
"cf" + std::to_string(i) + "_val_after_last_try"));
ASSERT_OK(Flush(i));
}
TEST_SYNC_POINT(
"DBMultiGetTestWithParam::MultiGetMultiCFMutex:AfterCreateSV");
});
std::vector<int> cfs;
std::vector<std::string> keys;
std::vector<std::string> values;
for (int i = 0; i < 8; ++i) {
cfs.push_back(i);
keys.push_back("cf" + std::to_string(i) + "_key");
}
values = MultiGet(cfs, keys, nullptr, std::get<0>(GetParam()),
std::get<1>(GetParam()));
create_sv_thread.join();
ASSERT_TRUE(last_try);
ASSERT_EQ(values.size(), 8);
for (unsigned int j = 0; j < values.size(); ++j) {
ASSERT_EQ(values[j],
"cf" + std::to_string(j) + "_val" + std::to_string(retries));
}
for (int i = 0; i < 8; ++i) {
auto* cfd = static_cast_with_check<ColumnFamilyHandleImpl>(
dbfull()->GetColumnFamilyHandle(i))
->cfd();
ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse);
}
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
TEST_P(DBMultiGetTestWithParam, MultiGetMultiCFSnapshot) {
#ifndef USE_COROUTINES
if (std::get<1>(GetParam())) {
ROCKSDB_GTEST_SKIP("This test requires coroutine support");
return;
}
#endif Options options = CurrentOptions();
CreateAndReopenWithCF({"pikachu", "ilya", "muromec", "dobrynia", "nikitich",
"alyosha", "popovich"},
options);
for (int i = 0; i < 8; ++i) {
ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key",
"cf" + std::to_string(i) + "_val"));
}
int get_sv_count = 0;
ROCKSDB_NAMESPACE::DBImpl* db = dbfull();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::MultiCFSnapshot::AfterRefSV", [&](void* ) {
if (++get_sv_count == 2) {
for (int i = 0; i < 8; ++i) {
ASSERT_OK(Flush(i));
ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key",
"cf" + std::to_string(i) + "_val2"));
}
}
if (get_sv_count == 8) {
for (int i = 0; i < 8; ++i) {
auto* cfd = static_cast_with_check<ColumnFamilyHandleImpl>(
db->GetColumnFamilyHandle(i))
->cfd();
ASSERT_TRUE(
(cfd->TEST_GetLocalSV()->Get() == SuperVersion::kSVInUse) ||
(cfd->TEST_GetLocalSV()->Get() == SuperVersion::kSVObsolete));
}
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
std::vector<int> cfs;
std::vector<std::string> keys;
std::vector<std::string> values;
for (int i = 0; i < 8; ++i) {
cfs.push_back(i);
keys.push_back("cf" + std::to_string(i) + "_key");
}
const Snapshot* snapshot = db_->GetSnapshot();
values = MultiGet(cfs, keys, snapshot, std::get<0>(GetParam()),
std::get<1>(GetParam()));
db_->ReleaseSnapshot(snapshot);
ASSERT_EQ(values.size(), 8);
for (unsigned int j = 0; j < values.size(); ++j) {
ASSERT_EQ(values[j], "cf" + std::to_string(j) + "_val");
}
for (int i = 0; i < 8; ++i) {
auto* cfd = static_cast_with_check<ColumnFamilyHandleImpl>(
dbfull()->GetColumnFamilyHandle(i))
->cfd();
ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse);
}
}
TEST_P(DBMultiGetTestWithParam, MultiGetMultiCFUnsorted) {
#ifndef USE_COROUTINES
if (std::get<1>(GetParam())) {
ROCKSDB_GTEST_SKIP("This test requires coroutine support");
return;
}
#endif Options options = CurrentOptions();
CreateAndReopenWithCF({"one", "two"}, options);
ASSERT_OK(Put(1, "foo", "bar"));
ASSERT_OK(Put(2, "baz", "xyz"));
ASSERT_OK(Put(1, "abc", "def"));
std::vector<int> cfs{1, 2, 1};
std::vector<std::string> keys{"foo", "baz", "abc"};
std::vector<std::string> values;
values = MultiGet(cfs, keys, nullptr,
std::get<0>(GetParam()),
std::get<1>(GetParam()));
ASSERT_EQ(values.size(), 3);
ASSERT_EQ(values[0], "bar");
ASSERT_EQ(values[1], "xyz");
ASSERT_EQ(values[2], "def");
}
TEST_P(DBMultiGetTestWithParam, MultiGetBatchedSimpleUnsorted) {
#ifndef USE_COROUTINES
if (std::get<1>(GetParam())) {
ROCKSDB_GTEST_SKIP("This test requires coroutine support");
return;
}
#endif if (!std::get<0>(GetParam())) {
ROCKSDB_GTEST_BYPASS("This test is only for batched MultiGet");
return;
}
do {
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
SetPerfLevel(kEnableCount);
ASSERT_OK(Put(1, "k1", "v1"));
ASSERT_OK(Put(1, "k2", "v2"));
ASSERT_OK(Put(1, "k3", "v3"));
ASSERT_OK(Put(1, "k4", "v4"));
ASSERT_OK(Delete(1, "k4"));
ASSERT_OK(Put(1, "k5", "v5"));
ASSERT_OK(Delete(1, "no_key"));
get_perf_context()->Reset();
std::vector<Slice> keys({"no_key", "k5", "k4", "k3", "k2", "k1"});
std::vector<PinnableSlice> values(keys.size());
std::vector<ColumnFamilyHandle*> cfs(keys.size(), handles_[1]);
std::vector<Status> s(keys.size());
ReadOptions ro;
ro.async_io = std::get<1>(GetParam());
db_->MultiGet(ro, handles_[1], keys.size(), keys.data(), values.data(),
s.data(), false);
ASSERT_EQ(values.size(), keys.size());
ASSERT_EQ(std::string(values[5].data(), values[5].size()), "v1");
ASSERT_EQ(std::string(values[4].data(), values[4].size()), "v2");
ASSERT_EQ(std::string(values[3].data(), values[3].size()), "v3");
ASSERT_EQ(std::string(values[1].data(), values[1].size()), "v5");
ASSERT_EQ(8, (int)get_perf_context()->multiget_read_bytes);
ASSERT_TRUE(s[0].IsNotFound());
ASSERT_OK(s[1]);
ASSERT_TRUE(s[2].IsNotFound());
ASSERT_OK(s[3]);
ASSERT_OK(s[4]);
ASSERT_OK(s[5]);
SetPerfLevel(kDisable);
} while (ChangeCompactOptions());
}
TEST_P(DBMultiGetTestWithParam, MultiGetBatchedSortedMultiFile) {
#ifndef USE_COROUTINES
if (std::get<1>(GetParam())) {
ROCKSDB_GTEST_SKIP("This test requires coroutine support");
return;
}
#endif if (!std::get<0>(GetParam())) {
ROCKSDB_GTEST_BYPASS("This test is only for batched MultiGet");
return;
}
do {
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
SetPerfLevel(kEnableCount);
ASSERT_OK(Put(1, "k1", "v1"));
ASSERT_OK(Put(1, "k2", "v2"));
ASSERT_OK(Flush(1));
ASSERT_OK(Put(1, "k3", "v3"));
ASSERT_OK(Put(1, "k4", "v4"));
ASSERT_OK(Flush(1));
ASSERT_OK(Delete(1, "k4"));
ASSERT_OK(Put(1, "k5", "v5"));
ASSERT_OK(Delete(1, "no_key"));
get_perf_context()->Reset();
std::vector<Slice> keys({"k1", "k2", "k3", "k4", "k5", "no_key"});
std::vector<PinnableSlice> values(keys.size());
std::vector<ColumnFamilyHandle*> cfs(keys.size(), handles_[1]);
std::vector<Status> s(keys.size());
ReadOptions ro;
ro.async_io = std::get<1>(GetParam());
db_->MultiGet(ro, handles_[1], keys.size(), keys.data(), values.data(),
s.data(), true);
ASSERT_EQ(values.size(), keys.size());
ASSERT_EQ(std::string(values[0].data(), values[0].size()), "v1");
ASSERT_EQ(std::string(values[1].data(), values[1].size()), "v2");
ASSERT_EQ(std::string(values[2].data(), values[2].size()), "v3");
ASSERT_EQ(std::string(values[4].data(), values[4].size()), "v5");
ASSERT_EQ(8, (int)get_perf_context()->multiget_read_bytes);
ASSERT_OK(s[0]);
ASSERT_OK(s[1]);
ASSERT_OK(s[2]);
ASSERT_TRUE(s[3].IsNotFound());
ASSERT_OK(s[4]);
ASSERT_TRUE(s[5].IsNotFound());
SetPerfLevel(kDisable);
} while (ChangeOptions());
}
TEST_P(DBMultiGetTestWithParam, MultiGetBatchedDuplicateKeys) {
#ifndef USE_COROUTINES
if (std::get<1>(GetParam())) {
ROCKSDB_GTEST_SKIP("This test requires coroutine support");
return;
}
#endif if (!std::get<0>(GetParam())) {
ROCKSDB_GTEST_BYPASS("This test is only for batched MultiGet");
return;
}
Options opts = CurrentOptions();
opts.merge_operator = MergeOperators::CreateStringAppendOperator();
CreateAndReopenWithCF({"pikachu"}, opts);
SetPerfLevel(kEnableCount);
ASSERT_OK(Merge(1, "k1", "v1"));
ASSERT_OK(Merge(1, "k2", "v2"));
ASSERT_OK(Flush(1));
MoveFilesToLevel(2, 1);
ASSERT_OK(Merge(1, "k3", "v3"));
ASSERT_OK(Merge(1, "k4", "v4"));
ASSERT_OK(Flush(1));
MoveFilesToLevel(2, 1);
ASSERT_OK(Merge(1, "k4", "v4_2"));
ASSERT_OK(Merge(1, "k6", "v6"));
ASSERT_OK(Flush(1));
MoveFilesToLevel(2, 1);
ASSERT_OK(Merge(1, "k7", "v7"));
ASSERT_OK(Merge(1, "k8", "v8"));
ASSERT_OK(Flush(1));
MoveFilesToLevel(2, 1);
get_perf_context()->Reset();
std::vector<Slice> keys({"k8", "k8", "k8", "k4", "k4", "k1", "k3"});
std::vector<PinnableSlice> values(keys.size());
std::vector<ColumnFamilyHandle*> cfs(keys.size(), handles_[1]);
std::vector<Status> s(keys.size());
ReadOptions ro;
ro.async_io = std::get<1>(GetParam());
db_->MultiGet(ro, handles_[1], keys.size(), keys.data(), values.data(),
s.data(), false);
ASSERT_EQ(values.size(), keys.size());
ASSERT_EQ(std::string(values[0].data(), values[0].size()), "v8");
ASSERT_EQ(std::string(values[1].data(), values[1].size()), "v8");
ASSERT_EQ(std::string(values[2].data(), values[2].size()), "v8");
ASSERT_EQ(std::string(values[3].data(), values[3].size()), "v4,v4_2");
ASSERT_EQ(std::string(values[4].data(), values[4].size()), "v4,v4_2");
ASSERT_EQ(std::string(values[5].data(), values[5].size()), "v1");
ASSERT_EQ(std::string(values[6].data(), values[6].size()), "v3");
ASSERT_EQ(24, (int)get_perf_context()->multiget_read_bytes);
for (Status& status : s) {
ASSERT_OK(status);
}
SetPerfLevel(kDisable);
}
TEST_P(DBMultiGetTestWithParam, MultiGetBatchedMultiLevel) {
#ifndef USE_COROUTINES
if (std::get<1>(GetParam())) {
ROCKSDB_GTEST_SKIP("This test requires coroutine support");
return;
}
#endif if (!std::get<0>(GetParam())) {
ROCKSDB_GTEST_BYPASS("This test is only for batched MultiGet");
return;
}
Options options = CurrentOptions();
options.disable_auto_compactions = true;
Reopen(options);
int num_keys = 0;
for (int i = 0; i < 128; ++i) {
ASSERT_OK(Put("key_" + std::to_string(i), "val_l2_" + std::to_string(i)));
num_keys++;
if (num_keys == 8) {
ASSERT_OK(Flush());
num_keys = 0;
}
}
if (num_keys > 0) {
ASSERT_OK(Flush());
num_keys = 0;
}
MoveFilesToLevel(2);
for (int i = 0; i < 128; i += 3) {
ASSERT_OK(Put("key_" + std::to_string(i), "val_l1_" + std::to_string(i)));
num_keys++;
if (num_keys == 8) {
ASSERT_OK(Flush());
num_keys = 0;
}
}
if (num_keys > 0) {
ASSERT_OK(Flush());
num_keys = 0;
}
MoveFilesToLevel(1);
for (int i = 0; i < 128; i += 5) {
ASSERT_OK(Put("key_" + std::to_string(i), "val_l0_" + std::to_string(i)));
num_keys++;
if (num_keys == 8) {
ASSERT_OK(Flush());
num_keys = 0;
}
}
if (num_keys > 0) {
ASSERT_OK(Flush());
num_keys = 0;
}
ASSERT_EQ(0, num_keys);
for (int i = 0; i < 128; i += 9) {
ASSERT_OK(Put("key_" + std::to_string(i), "val_mem_" + std::to_string(i)));
}
std::vector<std::string> keys;
std::vector<std::string> values;
for (int i = 64; i < 80; ++i) {
keys.push_back("key_" + std::to_string(i));
}
values = MultiGet(keys, nullptr, std::get<1>(GetParam()));
ASSERT_EQ(values.size(), 16);
for (unsigned int j = 0; j < values.size(); ++j) {
int key = j + 64;
if (key % 9 == 0) {
ASSERT_EQ(values[j], "val_mem_" + std::to_string(key));
} else if (key % 5 == 0) {
ASSERT_EQ(values[j], "val_l0_" + std::to_string(key));
} else if (key % 3 == 0) {
ASSERT_EQ(values[j], "val_l1_" + std::to_string(key));
} else {
ASSERT_EQ(values[j], "val_l2_" + std::to_string(key));
}
}
}
TEST_P(DBMultiGetTestWithParam, MultiGetDuplicatesEmptyLevel) {
#ifndef USE_COROUTINES
if (std::get<1>(GetParam())) {
ROCKSDB_GTEST_BYPASS("This test requires coroutine support");
return;
}
#endif if (!std::get<0>(GetParam())) {
ROCKSDB_GTEST_BYPASS("This test is only for batched MultiGet");
return;
}
std::shared_ptr<FaultInjectionTestFS> fault_fs(
new FaultInjectionTestFS(env_->GetFileSystem()));
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fault_fs));
Options options = CurrentOptions();
options.env = env.get();
options.disable_auto_compactions = true;
options.merge_operator = MergeOperators::CreateStringAppendOperator();
LRUCacheOptions cache_opts;
cache_opts.capacity = 1 << 20;
BlockBasedTableOptions table_opts;
table_opts.metadata_cache_options.top_level_index_pinning = PinningTier::kAll;
table_opts.metadata_cache_options.partition_pinning = PinningTier::kNone;
table_opts.index_type =
BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
table_opts.cache_index_and_filter_blocks = true;
table_opts.block_cache = cache_opts.MakeSharedCache();
table_opts.flush_block_policy_factory.reset(new MyFlushBlockPolicyFactory(1));
options.table_factory.reset(new BlockBasedTableFactory(table_opts));
Reopen(options);
int key;
key = 9;
ASSERT_OK(Put("key_" + std::to_string(key), "val_l2_" + std::to_string(key)));
ASSERT_OK(Flush());
MoveFilesToLevel(4);
key = 5;
ASSERT_OK(Put("key_" + std::to_string(key), "val_l2_" + std::to_string(key)));
key = 9;
ASSERT_OK(
Merge("key_" + std::to_string(key), "val_l2_" + std::to_string(key)));
const Snapshot* snap = dbfull()->GetSnapshot();
ASSERT_OK(
Merge("key_" + std::to_string(key), "val_l2_ext_" + std::to_string(key)));
ASSERT_OK(Flush());
MoveFilesToLevel(2);
key = 2;
ASSERT_OK(Put("key_" + std::to_string(key), "val_l2_" + std::to_string(key)));
key = 6;
ASSERT_OK(
Merge("key_" + std::to_string(key), "val_l2_" + std::to_string(key)));
ASSERT_OK(Flush());
MoveFilesToLevel(1);
std::vector<std::string> keys;
std::vector<std::string> values;
keys.push_back("key_" + std::to_string(9));
keys.push_back("key_" + std::to_string(9));
int num_reads = 0;
SyncPoint::GetInstance()->SetCallBack(
"FaultInjectionTestFS::RandomRead", [&](void*) {
++num_reads;
if (num_reads == 2) {
fault_fs->SetFilesystemActive(false);
} else {
fault_fs->SetFilesystemActive(true);
}
});
SyncPoint::GetInstance()->EnableProcessing();
size_t capacity = table_opts.block_cache->GetCapacity();
table_opts.block_cache->SetCapacity(0);
table_opts.block_cache->SetCapacity(capacity);
values = MultiGet(keys, nullptr, std::get<1>(GetParam()));
ASSERT_EQ(values.size(), 2);
SyncPoint::GetInstance()->DisableProcessing();
dbfull()->ReleaseSnapshot(snap);
Destroy(options);
}
TEST_P(DBMultiGetTestWithParam, MultiGetDuplicatesNonEmptyLevel) {
#ifndef USE_COROUTINES
if (std::get<1>(GetParam())) {
ROCKSDB_GTEST_BYPASS("This test requires coroutine support");
return;
}
#endif if (!std::get<0>(GetParam())) {
ROCKSDB_GTEST_BYPASS("This test is only for batched MultiGet");
return;
}
std::shared_ptr<FaultInjectionTestFS> fault_fs(
new FaultInjectionTestFS(env_->GetFileSystem()));
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fault_fs));
Options options = CurrentOptions();
options.env = env.get();
options.disable_auto_compactions = true;
options.merge_operator = MergeOperators::CreateStringAppendOperator();
LRUCacheOptions cache_opts;
cache_opts.capacity = 1 << 20;
BlockBasedTableOptions table_opts;
table_opts.metadata_cache_options.top_level_index_pinning = PinningTier::kAll;
table_opts.metadata_cache_options.partition_pinning = PinningTier::kNone;
table_opts.index_type =
BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
table_opts.cache_index_and_filter_blocks = true;
table_opts.block_cache = cache_opts.MakeSharedCache();
table_opts.flush_block_policy_factory.reset(new MyFlushBlockPolicyFactory(1));
options.table_factory.reset(new BlockBasedTableFactory(table_opts));
Reopen(options);
int key;
key = 8;
ASSERT_OK(Put("key_" + std::to_string(key), "val_l2_" + std::to_string(key)));
ASSERT_OK(Flush());
MoveFilesToLevel(4);
key = 7;
ASSERT_OK(Put("key_" + std::to_string(key), "val_l2_" + std::to_string(key)));
ASSERT_OK(Flush());
key = 9;
ASSERT_OK(Put("key_" + std::to_string(key), "val_l2_" + std::to_string(key)));
ASSERT_OK(Flush());
MoveFilesToLevel(3);
key = 5;
ASSERT_OK(Put("key_" + std::to_string(key), "val_l2_" + std::to_string(key)));
key = 9;
ASSERT_OK(
Merge("key_" + std::to_string(key), "merge1_l2_" + std::to_string(key)));
const Snapshot* snap = dbfull()->GetSnapshot();
ASSERT_OK(
Merge("key_" + std::to_string(key), "merge2_l2_" + std::to_string(key)));
ASSERT_OK(Flush());
MoveFilesToLevel(2);
key = 2;
ASSERT_OK(Put("key_" + std::to_string(key), "val_l2_" + std::to_string(key)));
key = 6;
ASSERT_OK(
Merge("key_" + std::to_string(key), "val_l2_" + std::to_string(key)));
ASSERT_OK(Flush());
MoveFilesToLevel(1);
std::vector<std::string> keys;
std::vector<std::string> values;
keys.push_back("key_" + std::to_string(9));
keys.push_back("key_" + std::to_string(9));
int num_reads = 0;
SyncPoint::GetInstance()->SetCallBack(
"FaultInjectionTestFS::RandomRead", [&](void*) {
++num_reads;
if (num_reads == 2) {
fault_fs->SetFilesystemActive(false);
} else {
fault_fs->SetFilesystemActive(true);
}
});
SyncPoint::GetInstance()->EnableProcessing();
size_t capacity = table_opts.block_cache->GetCapacity();
table_opts.block_cache->SetCapacity(0);
table_opts.block_cache->SetCapacity(capacity);
values = MultiGet(keys, nullptr, std::get<1>(GetParam()));
ASSERT_EQ(values.size(), 2);
ASSERT_EQ(values[0], "Corruption: Not active");
ASSERT_EQ(values[1], "val_l2_9,merge1_l2_9,merge2_l2_9");
SyncPoint::GetInstance()->DisableProcessing();
dbfull()->ReleaseSnapshot(snap);
Destroy(options);
}
TEST_P(DBMultiGetTestWithParam, MultiGetBatchedMultiLevelMerge) {
#ifndef USE_COROUTINES
if (std::get<1>(GetParam())) {
ROCKSDB_GTEST_SKIP("This test requires coroutine support");
return;
}
#endif if (!std::get<0>(GetParam())) {
ROCKSDB_GTEST_BYPASS("This test is only for batched MultiGet");
return;
}
Options options = CurrentOptions();
options.disable_auto_compactions = true;
options.merge_operator = MergeOperators::CreateStringAppendOperator();
BlockBasedTableOptions bbto;
bbto.filter_policy.reset(NewBloomFilterPolicy(10, false));
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
Reopen(options);
int num_keys = 0;
for (int i = 0; i < 128; ++i) {
ASSERT_OK(Put("key_" + std::to_string(i), "val_l2_" + std::to_string(i)));
num_keys++;
if (num_keys == 8) {
ASSERT_OK(Flush());
num_keys = 0;
}
}
if (num_keys > 0) {
ASSERT_OK(Flush());
num_keys = 0;
}
MoveFilesToLevel(2);
for (int i = 0; i < 128; i += 3) {
ASSERT_OK(Merge("key_" + std::to_string(i), "val_l1_" + std::to_string(i)));
num_keys++;
if (num_keys == 8) {
ASSERT_OK(Flush());
num_keys = 0;
}
}
if (num_keys > 0) {
ASSERT_OK(Flush());
num_keys = 0;
}
MoveFilesToLevel(1);
for (int i = 0; i < 128; i += 5) {
ASSERT_OK(Merge("key_" + std::to_string(i), "val_l0_" + std::to_string(i)));
num_keys++;
if (num_keys == 8) {
ASSERT_OK(Flush());
num_keys = 0;
}
}
if (num_keys > 0) {
ASSERT_OK(Flush());
num_keys = 0;
}
ASSERT_EQ(0, num_keys);
for (int i = 0; i < 128; i += 9) {
ASSERT_OK(
Merge("key_" + std::to_string(i), "val_mem_" + std::to_string(i)));
}
std::vector<std::string> keys;
std::vector<std::string> values;
for (int i = 32; i < 80; ++i) {
keys.push_back("key_" + std::to_string(i));
}
values = MultiGet(keys, nullptr, std::get<1>(GetParam()));
ASSERT_EQ(values.size(), keys.size());
for (unsigned int j = 0; j < 48; ++j) {
int key = j + 32;
std::string value;
value.append("val_l2_" + std::to_string(key));
if (key % 3 == 0) {
value.append(",");
value.append("val_l1_" + std::to_string(key));
}
if (key % 5 == 0) {
value.append(",");
value.append("val_l0_" + std::to_string(key));
}
if (key % 9 == 0) {
value.append(",");
value.append("val_mem_" + std::to_string(key));
}
ASSERT_EQ(values[j], value);
}
}
TEST_P(DBMultiGetTestWithParam, MultiGetBatchedValueSizeInMemory) {
#ifndef USE_COROUTINES
if (std::get<1>(GetParam())) {
ROCKSDB_GTEST_SKIP("This test requires coroutine support");
return;
}
#endif if (!std::get<0>(GetParam())) {
ROCKSDB_GTEST_BYPASS("This test is only for batched MultiGet");
return;
}
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
SetPerfLevel(kEnableCount);
ASSERT_OK(Put(1, "k1", "v_1"));
ASSERT_OK(Put(1, "k2", "v_2"));
ASSERT_OK(Put(1, "k3", "v_3"));
ASSERT_OK(Put(1, "k4", "v_4"));
ASSERT_OK(Put(1, "k5", "v_5"));
ASSERT_OK(Put(1, "k6", "v_6"));
std::vector<Slice> keys = {"k1", "k2", "k3", "k4", "k5", "k6"};
std::vector<PinnableSlice> values(keys.size());
std::vector<Status> s(keys.size());
std::vector<ColumnFamilyHandle*> cfs(keys.size(), handles_[1]);
get_perf_context()->Reset();
ReadOptions ro;
ro.value_size_soft_limit = 11;
ro.async_io = std::get<1>(GetParam());
db_->MultiGet(ro, handles_[1], keys.size(), keys.data(), values.data(),
s.data(), false);
ASSERT_EQ(values.size(), keys.size());
for (unsigned int i = 0; i < 4; i++) {
ASSERT_EQ(std::string(values[i].data(), values[i].size()),
"v_" + std::to_string(i + 1));
}
for (unsigned int i = 4; i < 6; i++) {
ASSERT_TRUE(s[i].IsAborted());
}
ASSERT_EQ(12, (int)get_perf_context()->multiget_read_bytes);
SetPerfLevel(kDisable);
}
TEST_P(DBMultiGetTestWithParam, MultiGetBatchedValueSize) {
#ifndef USE_COROUTINES
if (std::get<1>(GetParam())) {
ROCKSDB_GTEST_SKIP("This test requires coroutine support");
return;
}
#endif if (!std::get<0>(GetParam())) {
return;
}
do {
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
SetPerfLevel(kEnableCount);
ASSERT_OK(Put(1, "k6", "v6"));
ASSERT_OK(Put(1, "k7", "v7_"));
ASSERT_OK(Put(1, "k3", "v3_"));
ASSERT_OK(Put(1, "k4", "v4"));
ASSERT_OK(Flush(1));
ASSERT_OK(Delete(1, "k4"));
ASSERT_OK(Put(1, "k11", "v11"));
ASSERT_OK(Delete(1, "no_key"));
ASSERT_OK(Put(1, "k8", "v8_"));
ASSERT_OK(Put(1, "k13", "v13"));
ASSERT_OK(Put(1, "k14", "v14"));
ASSERT_OK(Put(1, "k15", "v15"));
ASSERT_OK(Put(1, "k16", "v16"));
ASSERT_OK(Put(1, "k17", "v17"));
ASSERT_OK(Flush(1));
ASSERT_OK(Put(1, "k1", "v1_"));
ASSERT_OK(Put(1, "k2", "v2_"));
ASSERT_OK(Put(1, "k5", "v5_"));
ASSERT_OK(Put(1, "k9", "v9_"));
ASSERT_OK(Put(1, "k10", "v10"));
ASSERT_OK(Delete(1, "k2"));
ASSERT_OK(Delete(1, "k6"));
get_perf_context()->Reset();
std::vector<Slice> keys({"k1", "k10", "k11", "k12", "k13", "k14", "k15",
"k16", "k17", "k2", "k3", "k4", "k5", "k6", "k7",
"k8", "k9", "no_key"});
std::vector<PinnableSlice> values(keys.size());
std::vector<ColumnFamilyHandle*> cfs(keys.size(), handles_[1]);
std::vector<Status> s(keys.size());
ReadOptions ro;
ro.value_size_soft_limit = 20;
ro.async_io = std::get<1>(GetParam());
db_->MultiGet(ro, handles_[1], keys.size(), keys.data(), values.data(),
s.data(), false);
ASSERT_EQ(values.size(), keys.size());
ASSERT_EQ(std::string(values[0].data(), values[0].size()), "v1_");
ASSERT_EQ(std::string(values[1].data(), values[1].size()), "v10");
ASSERT_TRUE(s[9].IsNotFound()); ASSERT_EQ(std::string(values[12].data(), values[12].size()), "v5_");
ASSERT_TRUE(s[13].IsNotFound()); ASSERT_EQ(std::string(values[16].data(), values[16].size()), "v9_");
ASSERT_EQ(std::string(values[2].data(), values[1].size()), "v11");
ASSERT_EQ(std::string(values[4].data(), values[4].size()), "v13");
ASSERT_EQ(std::string(values[5].data(), values[5].size()), "v14");
ASSERT_TRUE(s[3].IsAborted());
ASSERT_TRUE(s[6].IsAborted());
ASSERT_TRUE(s[7].IsAborted());
ASSERT_TRUE(s[8].IsAborted());
ASSERT_TRUE(s[10].IsAborted());
ASSERT_TRUE(s[11].IsAborted());
ASSERT_TRUE(s[14].IsAborted());
ASSERT_TRUE(s[15].IsAborted());
ASSERT_TRUE(s[17].IsAborted());
ASSERT_EQ(21, (int)get_perf_context()->multiget_read_bytes);
SetPerfLevel(kDisable);
} while (ChangeCompactOptions());
}
TEST_P(DBMultiGetTestWithParam, MultiGetBatchedValueSizeMultiLevelMerge) {
if (std::get<1>(GetParam())) {
ROCKSDB_GTEST_BYPASS("This test needs to be fixed for async IO");
return;
}
if (!std::get<0>(GetParam())) {
ROCKSDB_GTEST_BYPASS("This test is only for batched MultiGet");
return;
}
Options options = CurrentOptions();
options.disable_auto_compactions = true;
options.merge_operator = MergeOperators::CreateStringAppendOperator();
BlockBasedTableOptions bbto;
bbto.filter_policy.reset(NewBloomFilterPolicy(10, false));
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
Reopen(options);
int num_keys = 0;
for (int i = 0; i < 64; ++i) {
ASSERT_OK(Put("key_" + std::to_string(i), "val_l2_" + std::to_string(i)));
num_keys++;
if (num_keys == 8) {
ASSERT_OK(Flush());
num_keys = 0;
}
}
if (num_keys > 0) {
ASSERT_OK(Flush());
num_keys = 0;
}
MoveFilesToLevel(2);
for (int i = 0; i < 64; i += 3) {
ASSERT_OK(Merge("key_" + std::to_string(i), "val_l1_" + std::to_string(i)));
num_keys++;
if (num_keys == 8) {
ASSERT_OK(Flush());
num_keys = 0;
}
}
if (num_keys > 0) {
ASSERT_OK(Flush());
num_keys = 0;
}
MoveFilesToLevel(1);
for (int i = 0; i < 64; i += 5) {
ASSERT_OK(Merge("key_" + std::to_string(i), "val_l0_" + std::to_string(i)));
num_keys++;
if (num_keys == 8) {
ASSERT_OK(Flush());
num_keys = 0;
}
}
if (num_keys > 0) {
ASSERT_OK(Flush());
num_keys = 0;
}
ASSERT_EQ(0, num_keys);
for (int i = 0; i < 64; i += 9) {
ASSERT_OK(
Merge("key_" + std::to_string(i), "val_mem_" + std::to_string(i)));
}
std::vector<std::string> keys_str;
for (int i = 10; i < 50; ++i) {
keys_str.push_back("key_" + std::to_string(i));
}
std::vector<Slice> keys(keys_str.size());
for (int i = 0; i < 40; i++) {
keys[i] = Slice(keys_str[i]);
}
std::vector<PinnableSlice> values(keys_str.size());
std::vector<Status> statuses(keys_str.size());
ReadOptions read_options;
read_options.verify_checksums = true;
read_options.value_size_soft_limit = 380;
read_options.async_io = std::get<1>(GetParam());
db_->MultiGet(read_options, dbfull()->DefaultColumnFamily(), keys.size(),
keys.data(), values.data(), statuses.data());
ASSERT_EQ(values.size(), keys.size());
for (unsigned int j = 0; j < 26; ++j) {
int key = j + 10;
std::string value;
value.append("val_l2_" + std::to_string(key));
if (key % 3 == 0) {
value.append(",");
value.append("val_l1_" + std::to_string(key));
}
if (key % 5 == 0) {
value.append(",");
value.append("val_l0_" + std::to_string(key));
}
if (key % 9 == 0) {
value.append(",");
value.append("val_mem_" + std::to_string(key));
}
ASSERT_EQ(values[j], value);
ASSERT_OK(statuses[j]);
}
for (unsigned int j = 26; j < 40; j++) {
ASSERT_TRUE(statuses[j].IsAborted());
}
}
INSTANTIATE_TEST_CASE_P(DBMultiGetTestWithParam, DBMultiGetTestWithParam,
testing::Combine(testing::Bool(), testing::Bool()));
#if USE_COROUTINES
class DBMultiGetAsyncIOTest : public DBBasicTest,
public ::testing::WithParamInterface<bool> {
public:
DBMultiGetAsyncIOTest()
: DBBasicTest(), statistics_(ROCKSDB_NAMESPACE::CreateDBStatistics()) {
BlockBasedTableOptions bbto;
bbto.filter_policy.reset(NewBloomFilterPolicy(10));
options_ = CurrentOptions();
options_.disable_auto_compactions = true;
options_.statistics = statistics_;
options_.table_factory.reset(NewBlockBasedTableFactory(bbto));
options_.env = Env::Default();
Reopen(options_);
int num_keys = 0;
for (int i = 0; i < 256; ++i) {
EXPECT_OK(Put(Key(i), "val_l2_" + std::to_string(i)));
num_keys++;
if (num_keys == 8) {
EXPECT_OK(Flush());
num_keys = 0;
}
}
if (num_keys > 0) {
EXPECT_OK(Flush());
num_keys = 0;
}
MoveFilesToLevel(2);
for (int i = 0; i < 128; i += 3) {
EXPECT_OK(Put(Key(i), "val_l1_" + std::to_string(i)));
num_keys++;
if (num_keys == 8) {
EXPECT_OK(Flush());
num_keys = 0;
}
}
if (num_keys > 0) {
EXPECT_OK(Flush());
num_keys = 0;
}
for (int i = 128; i < 256; i += 32) {
std::string range_begin = Key(i);
std::string range_end = Key(i + 16);
EXPECT_OK(dbfull()->DeleteRange(WriteOptions(),
dbfull()->DefaultColumnFamily(),
range_begin, range_end));
for (int j = i + 16; j < i + 32; ++j) {
if (j % 3 == 0) {
EXPECT_OK(Put(Key(j), "val_l1_" + std::to_string(j)));
}
}
EXPECT_OK(Flush());
}
MoveFilesToLevel(1);
for (int i = 0; i < 128; i += 5) {
EXPECT_OK(Put(Key(i), "val_l0_" + std::to_string(i)));
num_keys++;
if (num_keys == 8) {
EXPECT_OK(Flush());
num_keys = 0;
}
}
if (num_keys > 0) {
EXPECT_OK(Flush());
num_keys = 0;
}
EXPECT_EQ(0, num_keys);
}
const std::shared_ptr<Statistics>& statistics() { return statistics_; }
protected:
void PrepareDBForTest() {
#ifdef ROCKSDB_IOURING_PRESENT
Reopen(options_);
#else
Iterator* iter = dbfull()->NewIterator(ReadOptions());
for (iter->SeekToFirst(); iter->Valid() && iter->status().ok();
iter->Next());
EXPECT_OK(iter->status());
delete iter;
#endif }
void ReopenDB() { Reopen(options_); }
private:
std::shared_ptr<Statistics> statistics_;
Options options_;
};
TEST_P(DBMultiGetAsyncIOTest, GetFromL0) {
std::vector<std::string> key_strs{Key(0), Key(40), Key(80)};
std::vector<Slice> keys{key_strs[0], key_strs[1], key_strs[2]};
std::vector<PinnableSlice> values(key_strs.size());
std::vector<Status> statuses(key_strs.size());
PrepareDBForTest();
ReadOptions ro;
ro.async_io = true;
ro.optimize_multiget_for_io = GetParam();
dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
keys.data(), values.data(), statuses.data());
ASSERT_EQ(values.size(), 3);
ASSERT_OK(statuses[0]);
ASSERT_OK(statuses[1]);
ASSERT_OK(statuses[2]);
ASSERT_EQ(values[0], "val_l0_" + std::to_string(0));
ASSERT_EQ(values[1], "val_l0_" + std::to_string(40));
ASSERT_EQ(values[2], "val_l0_" + std::to_string(80));
HistogramData multiget_io_batch_size;
statistics()->histogramData(MULTIGET_IO_BATCH_SIZE, &multiget_io_batch_size);
#ifdef ROCKSDB_IOURING_PRESENT
if (GetParam()) {
ASSERT_EQ(multiget_io_batch_size.count, 1);
ASSERT_EQ(multiget_io_batch_size.max, 3);
ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 3);
} else {
ASSERT_EQ(multiget_io_batch_size.count, 3);
}
#else
ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 0);
#endif }
TEST_P(DBMultiGetAsyncIOTest, GetFromL1) {
std::vector<std::string> key_strs;
std::vector<Slice> keys;
std::vector<PinnableSlice> values;
std::vector<Status> statuses;
key_strs.push_back(Key(33));
key_strs.push_back(Key(54));
key_strs.push_back(Key(102));
keys.emplace_back(key_strs[0]);
keys.emplace_back(key_strs[1]);
keys.emplace_back(key_strs[2]);
values.resize(keys.size());
statuses.resize(keys.size());
PrepareDBForTest();
ReadOptions ro;
ro.async_io = true;
ro.optimize_multiget_for_io = GetParam();
dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
keys.data(), values.data(), statuses.data());
ASSERT_EQ(values.size(), 3);
ASSERT_EQ(statuses[0], Status::OK());
ASSERT_EQ(statuses[1], Status::OK());
ASSERT_EQ(statuses[2], Status::OK());
ASSERT_EQ(values[0], "val_l1_" + std::to_string(33));
ASSERT_EQ(values[1], "val_l1_" + std::to_string(54));
ASSERT_EQ(values[2], "val_l1_" + std::to_string(102));
HistogramData multiget_io_batch_size;
statistics()->histogramData(MULTIGET_IO_BATCH_SIZE, &multiget_io_batch_size);
#ifdef ROCKSDB_IOURING_PRESENT
ASSERT_EQ(multiget_io_batch_size.count, 1);
ASSERT_EQ(multiget_io_batch_size.max, 3);
ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 3);
#else
ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 0);
#endif }
#ifdef ROCKSDB_IOURING_PRESENT
TEST_P(DBMultiGetAsyncIOTest, GetFromL1Error) {
std::vector<std::string> key_strs;
std::vector<Slice> keys;
std::vector<PinnableSlice> values;
std::vector<Status> statuses;
key_strs.push_back(Key(33));
key_strs.push_back(Key(54));
key_strs.push_back(Key(102));
keys.emplace_back(key_strs[0]);
keys.emplace_back(key_strs[1]);
keys.emplace_back(key_strs[2]);
values.resize(keys.size());
statuses.resize(keys.size());
int count = 0;
SyncPoint::GetInstance()->SetCallBack(
"TableCache::GetTableReader:BeforeOpenFile", [&](void* status) {
count++;
if (count == 6) {
Status* s = static_cast<Status*>(status);
*s = Status::IOError();
}
});
SyncPoint::GetInstance()->SetCallBack(
"SanitizeOptions::AfterChangeMaxOpenFiles", [&](void* arg) {
int* max_open_files = (int*)arg;
*max_open_files = 11;
});
SyncPoint::GetInstance()->EnableProcessing();
PrepareDBForTest();
ReadOptions ro;
ro.async_io = true;
ro.optimize_multiget_for_io = GetParam();
dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
keys.data(), values.data(), statuses.data());
SyncPoint::GetInstance()->DisableProcessing();
ASSERT_EQ(values.size(), 3);
ASSERT_EQ(statuses[0], Status::OK());
ASSERT_EQ(statuses[1], Status::OK());
ASSERT_EQ(statuses[2], Status::IOError());
HistogramData multiget_io_batch_size;
statistics()->histogramData(MULTIGET_IO_BATCH_SIZE, &multiget_io_batch_size);
ASSERT_EQ(multiget_io_batch_size.count, 1);
ASSERT_EQ(multiget_io_batch_size.max, 2);
ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 2);
}
#endif
TEST_P(DBMultiGetAsyncIOTest, LastKeyInFile) {
std::vector<std::string> key_strs;
std::vector<Slice> keys;
std::vector<PinnableSlice> values;
std::vector<Status> statuses;
key_strs.push_back(Key(21));
key_strs.push_back(Key(54));
key_strs.push_back(Key(102));
keys.emplace_back(key_strs[0]);
keys.emplace_back(key_strs[1]);
keys.emplace_back(key_strs[2]);
values.resize(keys.size());
statuses.resize(keys.size());
PrepareDBForTest();
ReadOptions ro;
ro.async_io = true;
ro.optimize_multiget_for_io = GetParam();
dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
keys.data(), values.data(), statuses.data());
ASSERT_EQ(values.size(), 3);
ASSERT_EQ(statuses[0], Status::OK());
ASSERT_EQ(statuses[1], Status::OK());
ASSERT_EQ(statuses[2], Status::OK());
ASSERT_EQ(values[0], "val_l1_" + std::to_string(21));
ASSERT_EQ(values[1], "val_l1_" + std::to_string(54));
ASSERT_EQ(values[2], "val_l1_" + std::to_string(102));
#ifdef ROCKSDB_IOURING_PRESENT
HistogramData multiget_io_batch_size;
statistics()->histogramData(MULTIGET_IO_BATCH_SIZE, &multiget_io_batch_size);
ASSERT_EQ(multiget_io_batch_size.count, 2);
ASSERT_EQ(multiget_io_batch_size.max, 2);
#endif }
TEST_P(DBMultiGetAsyncIOTest, GetFromL1AndL2) {
std::vector<std::string> key_strs;
std::vector<Slice> keys;
std::vector<PinnableSlice> values;
std::vector<Status> statuses;
key_strs.push_back(Key(33));
key_strs.push_back(Key(56));
key_strs.push_back(Key(102));
keys.emplace_back(key_strs[0]);
keys.emplace_back(key_strs[1]);
keys.emplace_back(key_strs[2]);
values.resize(keys.size());
statuses.resize(keys.size());
PrepareDBForTest();
ReadOptions ro;
ro.async_io = true;
ro.optimize_multiget_for_io = GetParam();
dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
keys.data(), values.data(), statuses.data());
ASSERT_EQ(values.size(), 3);
ASSERT_EQ(statuses[0], Status::OK());
ASSERT_EQ(statuses[1], Status::OK());
ASSERT_EQ(statuses[2], Status::OK());
ASSERT_EQ(values[0], "val_l1_" + std::to_string(33));
ASSERT_EQ(values[1], "val_l2_" + std::to_string(56));
ASSERT_EQ(values[2], "val_l1_" + std::to_string(102));
#ifdef ROCKSDB_IOURING_PRESENT
HistogramData multiget_io_batch_size;
statistics()->histogramData(MULTIGET_IO_BATCH_SIZE, &multiget_io_batch_size);
ASSERT_EQ(multiget_io_batch_size.count, GetParam() ? 1 : 2);
ASSERT_EQ(multiget_io_batch_size.max, GetParam() ? 3 : 2);
#endif }
TEST_P(DBMultiGetAsyncIOTest, GetFromL2WithRangeOverlapL0L1) {
std::vector<std::string> key_strs;
std::vector<Slice> keys;
std::vector<PinnableSlice> values;
std::vector<Status> statuses;
key_strs.push_back(Key(19));
key_strs.push_back(Key(26));
keys.emplace_back(key_strs[0]);
keys.emplace_back(key_strs[1]);
values.resize(keys.size());
statuses.resize(keys.size());
PrepareDBForTest();
ReadOptions ro;
ro.async_io = true;
ro.optimize_multiget_for_io = GetParam();
dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
keys.data(), values.data(), statuses.data());
ASSERT_EQ(values.size(), 2);
ASSERT_EQ(statuses[0], Status::OK());
ASSERT_EQ(statuses[1], Status::OK());
ASSERT_EQ(values[0], "val_l2_" + std::to_string(19));
ASSERT_EQ(values[1], "val_l2_" + std::to_string(26));
#ifdef ROCKSDB_IOURING_PRESENT
ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 2);
#else
ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 0);
#endif }
#ifdef ROCKSDB_IOURING_PRESENT
TEST_P(DBMultiGetAsyncIOTest, GetFromL2WithRangeDelInL1) {
std::vector<std::string> key_strs;
std::vector<Slice> keys;
std::vector<PinnableSlice> values;
std::vector<Status> statuses;
key_strs.push_back(Key(139));
key_strs.push_back(Key(163));
keys.emplace_back(key_strs[0]);
keys.emplace_back(key_strs[1]);
values.resize(keys.size());
statuses.resize(keys.size());
PrepareDBForTest();
ReadOptions ro;
ro.async_io = true;
ro.optimize_multiget_for_io = GetParam();
dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
keys.data(), values.data(), statuses.data());
ASSERT_EQ(values.size(), 2);
ASSERT_EQ(statuses[0], Status::NotFound());
ASSERT_EQ(statuses[1], Status::NotFound());
ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 2);
}
TEST_P(DBMultiGetAsyncIOTest, GetFromL1AndL2WithRangeDelInL1) {
std::vector<std::string> key_strs;
std::vector<Slice> keys;
std::vector<PinnableSlice> values;
std::vector<Status> statuses;
key_strs.push_back(Key(139));
key_strs.push_back(Key(144));
key_strs.push_back(Key(163));
keys.emplace_back(key_strs[0]);
keys.emplace_back(key_strs[1]);
keys.emplace_back(key_strs[2]);
values.resize(keys.size());
statuses.resize(keys.size());
PrepareDBForTest();
ReadOptions ro;
ro.async_io = true;
ro.optimize_multiget_for_io = GetParam();
dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
keys.data(), values.data(), statuses.data());
ASSERT_EQ(values.size(), keys.size());
ASSERT_EQ(statuses[0], Status::NotFound());
ASSERT_EQ(statuses[1], Status::OK());
ASSERT_EQ(values[1], "val_l1_" + std::to_string(144));
ASSERT_EQ(statuses[2], Status::NotFound());
ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 3);
}
#endif
TEST_P(DBMultiGetAsyncIOTest, GetNoIOUring) {
std::vector<std::string> key_strs;
std::vector<Slice> keys;
std::vector<PinnableSlice> values;
std::vector<Status> statuses;
key_strs.push_back(Key(33));
key_strs.push_back(Key(54));
key_strs.push_back(Key(102));
keys.emplace_back(key_strs[0]);
keys.emplace_back(key_strs[1]);
keys.emplace_back(key_strs[2]);
values.resize(keys.size());
statuses.resize(keys.size());
enable_io_uring = false;
ReopenDB();
ReadOptions ro;
ro.async_io = true;
ro.optimize_multiget_for_io = GetParam();
dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
keys.data(), values.data(), statuses.data());
ASSERT_EQ(values.size(), 3);
ASSERT_EQ(statuses[0], Status::OK());
ASSERT_EQ(statuses[1], Status::OK());
ASSERT_EQ(statuses[2], Status::OK());
HistogramData async_read_bytes;
statistics()->histogramData(ASYNC_READ_BYTES, &async_read_bytes);
ASSERT_EQ(async_read_bytes.count, 0);
ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 0);
}
INSTANTIATE_TEST_CASE_P(DBMultiGetAsyncIOTest, DBMultiGetAsyncIOTest,
testing::Bool());
#endif
TEST_F(DBBasicTest, MultiGetStats) {
Options options;
options.create_if_missing = true;
options.disable_auto_compactions = true;
options.env = env_;
options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
BlockBasedTableOptions table_options;
table_options.block_size = 1;
table_options.index_type =
BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
table_options.partition_filters = true;
table_options.no_block_cache = true;
table_options.cache_index_and_filter_blocks = false;
table_options.filter_policy.reset(NewBloomFilterPolicy(10, false));
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
CreateAndReopenWithCF({"pikachu"}, options);
int total_keys = 2000;
std::vector<std::string> keys_str(total_keys);
std::vector<Slice> keys(total_keys);
static size_t kMultiGetBatchSize = 100;
std::vector<PinnableSlice> values(kMultiGetBatchSize);
std::vector<Status> s(kMultiGetBatchSize);
ReadOptions read_opts;
Random rnd(309);
for (int i = 0; i < 500; ++i) {
keys_str[i] = "k" + std::to_string(i);
keys[i] = Slice(keys_str[i]);
ASSERT_OK(Put(1, "k" + std::to_string(i), rnd.RandomString(1000)));
if (i % 100 == 0) {
ASSERT_OK(Flush(1));
}
}
ASSERT_OK(Flush(1));
MoveFilesToLevel(2, 1);
for (int i = 501; i < 1000; ++i) {
keys_str[i] = "k" + std::to_string(i);
keys[i] = Slice(keys_str[i]);
ASSERT_OK(Put(1, "k" + std::to_string(i), rnd.RandomString(1000)));
if (i % 100 == 0) {
ASSERT_OK(Flush(1));
}
}
ASSERT_OK(Flush(1));
MoveFilesToLevel(2, 1);
for (int i = 1001; i < total_keys; ++i) {
keys_str[i] = "k" + std::to_string(i);
keys[i] = Slice(keys_str[i]);
ASSERT_OK(Put(1, "k" + std::to_string(i), rnd.RandomString(1000)));
if (i % 100 == 0) {
ASSERT_OK(Flush(1));
}
}
ASSERT_OK(Flush(1));
MoveFilesToLevel(1, 1);
Close();
ReopenWithColumnFamilies({"default", "pikachu"}, options);
ASSERT_OK(options.statistics->Reset());
db_->MultiGet(read_opts, handles_[1], kMultiGetBatchSize, &keys[1250],
values.data(), s.data(), false);
ASSERT_EQ(values.size(), kMultiGetBatchSize);
HistogramData hist_level;
HistogramData hist_index_and_filter_blocks;
HistogramData hist_sst;
options.statistics->histogramData(NUM_LEVEL_READ_PER_MULTIGET, &hist_level);
options.statistics->histogramData(NUM_INDEX_AND_FILTER_BLOCKS_READ_PER_LEVEL,
&hist_index_and_filter_blocks);
options.statistics->histogramData(NUM_SST_READ_PER_LEVEL, &hist_sst);
ASSERT_EQ(hist_level.max, 1);
ASSERT_GT(hist_index_and_filter_blocks.max, 0);
ASSERT_EQ(hist_sst.max, 2);
ASSERT_EQ(hist_level.min, 1);
ASSERT_GT(hist_index_and_filter_blocks.min, 0);
ASSERT_EQ(hist_sst.min, 1);
for (PinnableSlice& value : values) {
value.Reset();
}
for (Status& status : s) {
status = Status::OK();
}
db_->MultiGet(read_opts, handles_[1], kMultiGetBatchSize, &keys[950],
values.data(), s.data(), false);
options.statistics->histogramData(NUM_LEVEL_READ_PER_MULTIGET, &hist_level);
ASSERT_EQ(hist_level.max, 2);
}
class MultiGetPrefixExtractorTest : public DBBasicTest,
public ::testing::WithParamInterface<bool> {
};
TEST_P(MultiGetPrefixExtractorTest, Batched) {
Options options = CurrentOptions();
options.prefix_extractor.reset(NewFixedPrefixTransform(2));
options.memtable_prefix_bloom_size_ratio = 10;
BlockBasedTableOptions bbto;
if (GetParam()) {
bbto.index_type = BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
bbto.partition_filters = true;
}
bbto.filter_policy.reset(NewBloomFilterPolicy(10, false));
bbto.whole_key_filtering = false;
bbto.cache_index_and_filter_blocks = false;
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
Reopen(options);
SetPerfLevel(kEnableCount);
get_perf_context()->Reset();
ASSERT_OK(Put("k", "v0"));
ASSERT_OK(Put("kk1", "v1"));
ASSERT_OK(Put("kk2", "v2"));
ASSERT_OK(Put("kk3", "v3"));
ASSERT_OK(Put("kk4", "v4"));
std::vector<std::string> keys(
{"k", "kk1", "kk2", "kk3", "kk4", "rofl", "lmho"});
std::vector<std::string> expected(
{"v0", "v1", "v2", "v3", "v4", "NOT_FOUND", "NOT_FOUND"});
std::vector<std::string> values;
values = MultiGet(keys, nullptr);
ASSERT_EQ(values, expected);
ASSERT_EQ(get_perf_context()->bloom_memtable_miss_count, 2);
ASSERT_EQ(get_perf_context()->bloom_memtable_hit_count, 4);
ASSERT_OK(Flush());
get_perf_context()->Reset();
values = MultiGet(keys, nullptr);
ASSERT_EQ(values, expected);
ASSERT_EQ(get_perf_context()->bloom_sst_miss_count, 2);
ASSERT_EQ(get_perf_context()->bloom_sst_hit_count, 4);
get_perf_context()->Reset();
for (size_t i = 0; i < keys.size(); ++i) {
values[i] = Get(keys[i]);
}
ASSERT_EQ(values, expected);
ASSERT_EQ(get_perf_context()->bloom_sst_miss_count, 2);
ASSERT_EQ(get_perf_context()->bloom_sst_hit_count, 4);
}
INSTANTIATE_TEST_CASE_P(MultiGetPrefix, MultiGetPrefixExtractorTest,
::testing::Bool());
class DBMultiGetRowCacheTest : public DBBasicTest,
public ::testing::WithParamInterface<bool> {};
TEST_P(DBMultiGetRowCacheTest, MultiGetBatched) {
do {
option_config_ = kRowCache;
Options options = CurrentOptions();
options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
CreateAndReopenWithCF({"pikachu"}, options);
SetPerfLevel(kEnableCount);
ASSERT_OK(Put(1, "k1", "v1"));
ASSERT_OK(Put(1, "k2", "v2"));
ASSERT_OK(Put(1, "k3", "v3"));
ASSERT_OK(Put(1, "k4", "v4"));
ASSERT_OK(Flush(1));
ASSERT_OK(Put(1, "k5", "v5"));
const Snapshot* snap1 = dbfull()->GetSnapshot();
ASSERT_OK(Delete(1, "k4"));
ASSERT_OK(Flush(1));
const Snapshot* snap2 = dbfull()->GetSnapshot();
get_perf_context()->Reset();
std::vector<Slice> keys({"no_key", "k5", "k4", "k3", "k1"});
std::vector<PinnableSlice> values(keys.size());
std::vector<ColumnFamilyHandle*> cfs(keys.size(), handles_[1]);
std::vector<Status> s(keys.size());
ReadOptions ro;
bool use_snapshots = GetParam();
if (use_snapshots) {
ro.snapshot = snap2;
}
db_->MultiGet(ro, handles_[1], keys.size(), keys.data(), values.data(),
s.data(), false);
ASSERT_EQ(values.size(), keys.size());
ASSERT_EQ(std::string(values[4].data(), values[4].size()), "v1");
ASSERT_EQ(std::string(values[3].data(), values[3].size()), "v3");
ASSERT_EQ(std::string(values[1].data(), values[1].size()), "v5");
ASSERT_EQ(6, (int)get_perf_context()->multiget_read_bytes);
ASSERT_TRUE(s[0].IsNotFound());
ASSERT_OK(s[1]);
ASSERT_TRUE(s[2].IsNotFound());
ASSERT_OK(s[3]);
ASSERT_OK(s[4]);
keys.assign({"no_key", "k5", "k3", "k2"});
for (size_t i = 0; i < keys.size(); ++i) {
values[i].Reset();
s[i] = Status::OK();
}
get_perf_context()->Reset();
if (use_snapshots) {
ro.snapshot = snap1;
}
db_->MultiGet(ReadOptions(), handles_[1], keys.size(), keys.data(),
values.data(), s.data(), false);
ASSERT_EQ(std::string(values[3].data(), values[3].size()), "v2");
ASSERT_EQ(std::string(values[2].data(), values[2].size()), "v3");
ASSERT_EQ(std::string(values[1].data(), values[1].size()), "v5");
ASSERT_EQ(6, (int)get_perf_context()->multiget_read_bytes);
ASSERT_TRUE(s[0].IsNotFound());
ASSERT_OK(s[1]);
ASSERT_OK(s[2]);
ASSERT_OK(s[3]);
if (use_snapshots) {
ASSERT_EQ(1, TestGetTickerCount(options, ROW_CACHE_HIT));
} else {
ASSERT_EQ(2, TestGetTickerCount(options, ROW_CACHE_HIT));
}
SetPerfLevel(kDisable);
dbfull()->ReleaseSnapshot(snap1);
dbfull()->ReleaseSnapshot(snap2);
} while (ChangeCompactOptions());
}
INSTANTIATE_TEST_CASE_P(DBMultiGetRowCacheTest, DBMultiGetRowCacheTest,
testing::Values(true, false));
TEST_F(DBBasicTest, GetAllKeyVersions) {
Options options = CurrentOptions();
options.env = env_;
options.create_if_missing = true;
options.disable_auto_compactions = true;
CreateAndReopenWithCF({"pikachu"}, options);
ASSERT_EQ(2, handles_.size());
const size_t kNumInserts = 4;
const size_t kNumDeletes = 4;
const size_t kNumUpdates = 4;
for (size_t i = 0; i != kNumInserts; ++i) {
ASSERT_OK(Put(std::to_string(i), "value"));
}
for (size_t i = 0; i != kNumUpdates; ++i) {
ASSERT_OK(Put(std::to_string(i), "value1"));
}
for (size_t i = 0; i != kNumDeletes; ++i) {
ASSERT_OK(Delete(std::to_string(i)));
}
std::vector<KeyVersion> key_versions;
ASSERT_OK(GetAllKeyVersions(
db_.get(), {}, {}, std::numeric_limits<size_t>::max(), &key_versions));
ASSERT_EQ(kNumInserts + kNumDeletes + kNumUpdates, key_versions.size());
for (size_t i = 0; i < kNumInserts + kNumDeletes + kNumUpdates; i++) {
if (i % 3 == 0) {
ASSERT_EQ(key_versions[i].GetTypeName(), "TypeDeletion");
} else {
ASSERT_EQ(key_versions[i].GetTypeName(), "TypeValue");
}
}
ASSERT_OK(GetAllKeyVersions(db_.get(), handles_[0], {}, {},
std::numeric_limits<size_t>::max(),
&key_versions));
ASSERT_EQ(kNumInserts + kNumDeletes + kNumUpdates, key_versions.size());
for (size_t i = 0; i + 1 != kNumInserts; ++i) {
ASSERT_OK(Put(1, std::to_string(i), "value"));
}
for (size_t i = 0; i + 1 != kNumUpdates; ++i) {
ASSERT_OK(Put(1, std::to_string(i), "value1"));
}
for (size_t i = 0; i + 1 != kNumDeletes; ++i) {
ASSERT_OK(Delete(1, std::to_string(i)));
}
ASSERT_OK(GetAllKeyVersions(db_.get(), handles_[1], {}, {},
std::numeric_limits<size_t>::max(),
&key_versions));
ASSERT_EQ(kNumInserts + kNumDeletes + kNumUpdates - 3, key_versions.size());
ASSERT_OK(GetAllKeyVersions(db_.get(), handles_[1], Slice(), Slice(),
std::numeric_limits<size_t>::max(),
&key_versions));
ASSERT_EQ(key_versions.size(), 0);
}
TEST_F(DBBasicTest, ValueTypeString) {
KeyVersion key_version;
for (unsigned char i = ValueType::kTypeDeletion; i < ValueType::kTypeMaxValid;
i++) {
key_version.type = i;
ASSERT_TRUE(key_version.GetTypeName() != "Invalid");
}
}
TEST_F(DBBasicTest, MultiGetIOBufferOverrun) {
Options options = CurrentOptions();
Random rnd(301);
BlockBasedTableOptions table_options;
table_options.pin_l0_filter_and_index_blocks_in_cache = true;
table_options.block_size = 16 * 1024;
ASSERT_TRUE(table_options.block_size >
BlockBasedTable::kMultiGetReadStackBufSize);
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
Reopen(options);
std::string zero_str(128, '\0');
for (int i = 0; i < 100; ++i) {
std::string value(rnd.RandomString(128) + zero_str);
assert(Put(Key(i), value) == Status::OK());
}
ASSERT_OK(Flush());
std::vector<std::string> key_data(10);
std::vector<Slice> keys;
std::vector<PinnableSlice> values(10);
std::vector<Status> statuses;
ReadOptions ro;
key_data.emplace_back(Key(0));
keys.emplace_back(key_data.back());
key_data.emplace_back(Key(50));
keys.emplace_back(key_data.back());
statuses.resize(keys.size());
dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
keys.data(), values.data(), statuses.data(), true);
}
TEST_F(DBBasicTest, MultiGetWithSnapshotsAndPersistedTier) {
Options options = CurrentOptions();
options.create_if_missing = true;
options.atomic_flush = true;
DestroyAndReopen(options);
CreateAndReopenWithCF({"cf1", "cf2"}, options);
ASSERT_OK(Put(0, "key1", "value1_cf0"));
ASSERT_OK(Put(1, "key1", "value1_cf1"));
ASSERT_OK(Put(2, "key1", "value1_cf2"));
ASSERT_OK(Flush({0, 1, 2}));
for (auto cf : {0, 1, 2}) {
ASSERT_EQ(1, NumTableFilesAtLevel(0, cf));
}
ASSERT_OK(Put(0, "key1", "value2_cf0"));
ASSERT_OK(Put(1, "key1", "value2_cf1"));
ASSERT_OK(Put(2, "key1", "value2_cf2"));
std::atomic<bool> flush_done(false);
std::thread flush_thread([&]() {
ASSERT_OK(Flush({0, 1, 2}));
flush_done.store(true);
});
ReadOptions ro;
const Snapshot* snapshot = db_->GetSnapshot();
ro.snapshot = snapshot;
ro.read_tier = kPersistedTier;
std::string k = "key1";
std::vector<Slice> keys(3, Slice(k));
std::vector<Status> statuses(keys.size());
std::vector<ColumnFamilyHandle*> cfs(keys.size());
std::vector<Slice> new_keys(keys.size());
std::vector<PinnableSlice> pin_values(keys.size());
for (size_t i = 0; i < keys.size(); ++i) {
cfs[i] = handles_[i];
}
db_->MultiGet(ro, cfs.size(), cfs.data(), keys.data(), pin_values.data(),
statuses.data());
for (const auto& s : statuses) {
ASSERT_OK(s);
}
if (pin_values[0] == "value1_cf0") {
ASSERT_EQ(pin_values[1], "value1_cf1");
ASSERT_EQ(pin_values[2], "value1_cf2");
} else {
ASSERT_EQ(pin_values[0], "value2_cf0");
ASSERT_EQ(pin_values[1], "value2_cf1");
ASSERT_EQ(pin_values[2], "value2_cf2");
}
flush_thread.join();
db_->ReleaseSnapshot(snapshot);
}
TEST_F(DBBasicTest, IncrementalRecoveryNoCorrupt) {
Options options = CurrentOptions();
DestroyAndReopen(options);
CreateAndReopenWithCF({"pikachu", "eevee"}, options);
size_t num_cfs = handles_.size();
ASSERT_EQ(3, num_cfs);
WriteOptions write_opts;
write_opts.disableWAL = true;
for (size_t cf = 0; cf != num_cfs; ++cf) {
for (size_t i = 0; i != 10000; ++i) {
std::string key_str = Key(static_cast<int>(i));
std::string value_str = std::to_string(cf) + "_" + std::to_string(i);
ASSERT_OK(Put(static_cast<int>(cf), key_str, value_str));
if (0 == (i % 1000)) {
ASSERT_OK(Flush(static_cast<int>(cf)));
}
}
}
for (size_t cf = 0; cf != num_cfs; ++cf) {
ASSERT_OK(Flush(static_cast<int>(cf)));
}
Close();
options.best_efforts_recovery = true;
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu", "eevee"},
options);
num_cfs = handles_.size();
ASSERT_EQ(3, num_cfs);
for (size_t cf = 0; cf != num_cfs; ++cf) {
for (int i = 0; i != 10000; ++i) {
std::string key_str = Key(static_cast<int>(i));
std::string expected_value_str =
std::to_string(cf) + "_" + std::to_string(i);
ASSERT_EQ(expected_value_str, Get(static_cast<int>(cf), key_str));
}
}
}
TEST_F(DBBasicTest, BestEffortsRecoveryWithVersionBuildingFailure) {
Options options = CurrentOptions();
DestroyAndReopen(options);
ASSERT_OK(Put("foo", "value"));
ASSERT_OK(Flush());
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->SetCallBack(
"VersionBuilder::CheckConsistencyBeforeReturn", [&](void* arg) {
ASSERT_NE(nullptr, arg);
*(static_cast<Status*>(arg)) = Status::Corruption("Inject corruption");
});
SyncPoint::GetInstance()->EnableProcessing();
options.best_efforts_recovery = true;
Status s = TryReopen(options);
ASSERT_TRUE(s.IsCorruption());
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}
namespace {
class TableFileListener : public EventListener {
public:
void OnTableFileCreated(const TableFileCreationInfo& info) override {
InstrumentedMutexLock lock(&mutex_);
cf_to_paths_[info.cf_name].push_back(info.file_path);
}
std::vector<std::string>& GetFiles(const std::string& cf_name) {
InstrumentedMutexLock lock(&mutex_);
return cf_to_paths_[cf_name];
}
private:
InstrumentedMutex mutex_;
std::unordered_map<std::string, std::vector<std::string>> cf_to_paths_;
};
class FlushTableFileListener : public EventListener {
public:
void OnTableFileCreated(const TableFileCreationInfo& info) override {
InstrumentedMutexLock lock(&mutex_);
if (info.reason != TableFileCreationReason::kFlush) {
return;
}
cf_to_flushed_files_[info.cf_name].push_back(info.file_path);
}
std::vector<std::string>& GetFlushedFiles(const std::string& cf_name) {
InstrumentedMutexLock lock(&mutex_);
return cf_to_flushed_files_[cf_name];
}
private:
InstrumentedMutex mutex_;
std::unordered_map<std::string, std::vector<std::string>>
cf_to_flushed_files_;
};
class FlushBlobFileListener : public EventListener {
public:
void OnBlobFileCreated(const BlobFileCreationInfo& info) override {
InstrumentedMutexLock lock(&mutex_);
if (info.reason != BlobFileCreationReason::kFlush) {
return;
}
cf_to_flushed_blobs_files_[info.cf_name].push_back(info.file_path);
}
std::vector<std::string>& GetFlushedBlobFiles(const std::string& cf_name) {
InstrumentedMutexLock lock(&mutex_);
return cf_to_flushed_blobs_files_[cf_name];
}
private:
InstrumentedMutex mutex_;
std::unordered_map<std::string, std::vector<std::string>>
cf_to_flushed_blobs_files_;
};
}
TEST_F(DBBasicTest, LastSstFileNotInManifest) {
Options options = CurrentOptions();
DestroyAndReopen(options);
Close();
constexpr uint64_t kSstFileNumber = 100;
const std::string kSstFile = MakeTableFileName(dbname_, kSstFileNumber);
ASSERT_OK(WriteStringToFile(env_,
"bad sst file content",
kSstFile,
true));
ASSERT_OK(env_->FileExists(kSstFile));
TableFileListener* listener = new TableFileListener();
options.listeners.emplace_back(listener);
Reopen(options);
ASSERT_TRUE(env_->FileExists(kSstFile).IsNotFound());
ASSERT_OK(Put("k", "v"));
ASSERT_OK(Flush());
std::vector<std::string>& files =
listener->GetFiles(kDefaultColumnFamilyName);
ASSERT_EQ(files.size(), 1);
const std::string fname = files[0].erase(0, (dbname_ + "/").size());
uint64_t number = 0;
FileType type = kTableFile;
ASSERT_TRUE(ParseFileName(fname, &number, &type));
ASSERT_EQ(type, kTableFile);
ASSERT_GT(number, kSstFileNumber);
}
TEST_F(DBBasicTest, RecoverWithMissingFiles) {
Options options = CurrentOptions();
DestroyAndReopen(options);
TableFileListener* listener = new TableFileListener();
options.disable_auto_compactions = true;
options.listeners.emplace_back(listener);
CreateAndReopenWithCF({"pikachu", "eevee"}, options);
std::vector<std::string> all_cf_names = {kDefaultColumnFamilyName, "pikachu",
"eevee"};
size_t num_cfs = handles_.size();
ASSERT_EQ(3, num_cfs);
for (size_t cf = 0; cf != num_cfs; ++cf) {
ASSERT_OK(Put(static_cast<int>(cf), "a", "0_value"));
ASSERT_OK(Flush(static_cast<int>(cf)));
ASSERT_OK(Put(static_cast<int>(cf), "b", "0_value"));
ASSERT_OK(Flush(static_cast<int>(cf)));
ASSERT_OK(Put(static_cast<int>(cf), "c", "0_value"));
ASSERT_OK(Flush(static_cast<int>(cf)));
}
for (size_t i = 0; i < all_cf_names.size(); ++i) {
std::vector<std::string>& files = listener->GetFiles(all_cf_names[i]);
ASSERT_EQ(3, files.size());
std::string corrupted_data;
ASSERT_OK(ReadFileToString(env_, files[files.size() - 1], &corrupted_data));
ASSERT_OK(WriteStringToFile(
env_, corrupted_data.substr(0, corrupted_data.size() - 2),
files[files.size() - 1], true));
for (int j = static_cast<int>(files.size() - 2); j >= static_cast<int>(i);
--j) {
ASSERT_OK(env_->DeleteFile(files[j]));
}
}
options.best_efforts_recovery = true;
ReopenWithColumnFamilies(all_cf_names, options);
ReadOptions read_opts;
read_opts.total_order_seek = true;
{
std::unique_ptr<Iterator> iter(db_->NewIterator(read_opts, handles_[0]));
iter->SeekToFirst();
ASSERT_FALSE(iter->Valid());
ASSERT_OK(iter->status());
iter.reset(db_->NewIterator(read_opts, handles_[1]));
iter->SeekToFirst();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("a", iter->key());
iter->Next();
ASSERT_FALSE(iter->Valid());
ASSERT_OK(iter->status());
iter.reset(db_->NewIterator(read_opts, handles_[2]));
iter->SeekToFirst();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("a", iter->key());
iter->Next();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("b", iter->key());
iter->Next();
ASSERT_FALSE(iter->Valid());
ASSERT_OK(iter->status());
}
}
class BestEffortsRecoverIncompleteVersionTest
: public DBTestBase,
public testing::WithParamInterface<std::tuple<bool, bool>> {
public:
BestEffortsRecoverIncompleteVersionTest()
: DBTestBase("best_efforts_recover_incomplete_version_test",
false) {}
};
TEST_P(BestEffortsRecoverIncompleteVersionTest, Basic) {
Options options = CurrentOptions();
options.enable_blob_files = std::get<0>(GetParam());
bool delete_blob_file_too = std::get<1>(GetParam());
DestroyAndReopen(options);
FlushTableFileListener* flush_table_listener = new FlushTableFileListener();
FlushBlobFileListener* flush_blob_listener = new FlushBlobFileListener();
options.disable_auto_compactions = true;
options.listeners.emplace_back(flush_table_listener);
options.listeners.emplace_back(flush_blob_listener);
CreateAndReopenWithCF({"pikachu", "eevee"}, options);
std::vector<std::string> all_cf_names = {kDefaultColumnFamilyName, "pikachu",
"eevee"};
int num_cfs = static_cast<int>(handles_.size());
ASSERT_EQ(3, num_cfs);
std::string start = "a";
Slice start_slice = start;
std::string end = "d";
Slice end_slice = end;
for (int cf = 0; cf != num_cfs; ++cf) {
ASSERT_OK(Put(cf, "a", "a_value"));
ASSERT_OK(Flush(cf));
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), handles_[cf],
&start_slice, &end_slice));
ASSERT_OK(Put(cf, "a", "a_value_new"));
ASSERT_OK(Flush(cf));
ASSERT_OK(Put(cf, "b", "b_value"));
ASSERT_OK(Flush(cf));
ASSERT_OK(Put(cf, "f", "f_value"));
ASSERT_OK(Flush(cf));
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), handles_[cf],
&start_slice, &end_slice));
}
dbfull()->TEST_DeleteObsoleteFiles();
for (int i = 0; i < num_cfs; ++i) {
std::vector<std::string>& files =
flush_table_listener->GetFlushedFiles(all_cf_names[i]);
ASSERT_EQ(4, files.size());
ASSERT_OK(env_->DeleteFile(files[files.size() - 1]));
if (options.enable_blob_files) {
std::vector<std::string>& blob_files =
flush_blob_listener->GetFlushedBlobFiles(all_cf_names[i]);
ASSERT_EQ(4, blob_files.size());
if (delete_blob_file_too) {
ASSERT_OK(env_->DeleteFile(blob_files[files.size() - 1]));
}
}
}
options.best_efforts_recovery = true;
ReopenWithColumnFamilies(all_cf_names, options);
for (int i = 0; i < num_cfs; ++i) {
auto cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[i]);
ColumnFamilyData* cfd = cfh->cfd();
VersionStorageInfo* vstorage = cfd->current()->storage_info();
ASSERT_EQ(0, vstorage->LevelFiles(0).size());
ASSERT_EQ(1, vstorage->LevelFiles(1).size());
}
ReadOptions read_opts;
read_opts.total_order_seek = true;
for (int i = 0; i < num_cfs; ++i) {
std::unique_ptr<Iterator> iter(db_->NewIterator(read_opts, handles_[i]));
iter->SeekToFirst();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ("a", iter->key());
ASSERT_EQ("a_value_new", iter->value());
iter->Next();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ("b", iter->key());
ASSERT_EQ("b_value", iter->value());
iter->Next();
ASSERT_FALSE(iter->Valid());
ASSERT_OK(iter->status());
}
for (int cf = 0; cf < num_cfs; ++cf) {
ASSERT_OK(Put(cf, "g", "g_value"));
ASSERT_OK(Flush(cf));
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), handles_[cf], nullptr,
nullptr));
std::string value;
ASSERT_OK(db_->Get(ReadOptions(), handles_[cf], "g", &value));
ASSERT_EQ("g_value", value);
}
}
INSTANTIATE_TEST_CASE_P(BestEffortsRecoverIncompleteVersionTest,
BestEffortsRecoverIncompleteVersionTest,
testing::Values(std::make_tuple(false, false),
std::make_tuple(true, false),
std::make_tuple(true, true)));
TEST_F(DBBasicTest, BestEffortsRecoveryTryMultipleManifests) {
Options options = CurrentOptions();
options.env = env_;
DestroyAndReopen(options);
ASSERT_OK(Put("foo", "value0"));
ASSERT_OK(Flush());
Close();
{
std::string garbage(10, '\0');
ASSERT_OK(WriteStringToFile(env_, garbage, dbname_ + "/MANIFEST-001000",
true));
}
{
std::string garbage(10, '\0');
ASSERT_OK(WriteStringToFile(env_, garbage, dbname_ + "/001001.sst",
true));
}
options.best_efforts_recovery = true;
Reopen(options);
ASSERT_OK(Put("bar", "value"));
}
TEST_F(DBBasicTest, RecoverWithNoCurrentFile) {
Options options = CurrentOptions();
options.env = env_;
DestroyAndReopen(options);
CreateAndReopenWithCF({"pikachu"}, options);
options.best_efforts_recovery = true;
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options);
ASSERT_EQ(2, handles_.size());
ASSERT_OK(Put("foo", "value"));
ASSERT_OK(Put(1, "bar", "value"));
ASSERT_OK(Flush());
ASSERT_OK(Flush(1));
Close();
ASSERT_OK(env_->DeleteFile(CurrentFileName(dbname_)));
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options);
std::vector<std::string> cf_names;
ASSERT_OK(DB::ListColumnFamilies(DBOptions(options), dbname_, &cf_names));
ASSERT_EQ(2, cf_names.size());
for (const auto& name : cf_names) {
ASSERT_TRUE(name == kDefaultColumnFamilyName || name == "pikachu");
}
}
TEST_F(DBBasicTest, RecoverWithNoManifest) {
Options options = CurrentOptions();
options.env = env_;
DestroyAndReopen(options);
ASSERT_OK(Put("foo", "value"));
ASSERT_OK(Flush());
Close();
{
std::vector<std::string> files;
ASSERT_OK(env_->GetChildren(dbname_, &files));
for (const auto& file : files) {
uint64_t number = 0;
FileType type = kWalFile;
if (ParseFileName(file, &number, &type) && type == kDescriptorFile) {
ASSERT_OK(env_->DeleteFile(dbname_ + "/" + file));
}
}
}
options.best_efforts_recovery = true;
options.create_if_missing = false;
Status s = TryReopen(options);
ASSERT_TRUE(s.IsInvalidArgument());
options.create_if_missing = true;
Reopen(options);
ASSERT_EQ("NOT_FOUND", Get("foo"));
}
TEST_F(DBBasicTest, SkipWALIfMissingTableFiles) {
Options options = CurrentOptions();
DestroyAndReopen(options);
TableFileListener* listener = new TableFileListener();
options.listeners.emplace_back(listener);
CreateAndReopenWithCF({"pikachu"}, options);
std::vector<std::string> kAllCfNames = {kDefaultColumnFamilyName, "pikachu"};
size_t num_cfs = handles_.size();
ASSERT_EQ(2, num_cfs);
for (int cf = 0; cf < static_cast<int>(kAllCfNames.size()); ++cf) {
ASSERT_OK(Put(cf, "a", "0_value"));
ASSERT_OK(Flush(cf));
ASSERT_OK(Put(cf, "b", "0_value"));
}
for (size_t i = 0; i < kAllCfNames.size(); ++i) {
std::vector<std::string>& files = listener->GetFiles(kAllCfNames[i]);
ASSERT_EQ(1, files.size());
for (int j = static_cast<int>(files.size() - 1); j >= static_cast<int>(i);
--j) {
ASSERT_OK(env_->DeleteFile(files[j]));
}
}
options.best_efforts_recovery = true;
ReopenWithColumnFamilies(kAllCfNames, options);
ReadOptions read_opts;
read_opts.total_order_seek = true;
std::unique_ptr<Iterator> iter(db_->NewIterator(read_opts, handles_[0]));
iter->SeekToFirst();
ASSERT_FALSE(iter->Valid());
ASSERT_OK(iter->status());
iter.reset(db_->NewIterator(read_opts, handles_[1]));
iter->SeekToFirst();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("a", iter->key());
iter->Next();
ASSERT_FALSE(iter->Valid());
ASSERT_OK(iter->status());
}
TEST_F(DBBasicTest, BestEffortRecoveryFailureWithTableCacheUseAfterFree) {
Options options = CurrentOptions();
options.create_if_missing = true;
options.env = env_;
options.max_manifest_file_size = 1;
options.max_manifest_space_amp_pct = 0;
DestroyAndReopen(options);
ASSERT_OK(db_->DisableFileDeletions());
for (int i = 0; i < 10; i++) {
ASSERT_OK(Put("key" + std::to_string(i),
std::string(1000, static_cast<char>('a' + i))));
ASSERT_OK(Flush());
}
std::vector<std::string> files;
ASSERT_OK(env_->GetChildren(dbname_, &files));
int manifest_count = 0;
for (const auto& file : files) {
if (file.find("MANIFEST") != std::string::npos) {
manifest_count++;
}
}
ASSERT_GE(manifest_count, 2);
int count = 0;
bool injected = false;
SyncPoint::GetInstance()->SetCallBack(
"VersionBuilder::CheckConsistencyBeforeReturn", [&](void* arg) {
count++;
if (count > 3 && !injected) {
ASSERT_NE(nullptr, arg);
*(static_cast<Status*>(arg)) =
Status::Corruption("Injected corruption");
injected = true;
}
});
SyncPoint::GetInstance()->EnableProcessing();
options.best_efforts_recovery = true;
Status s = TryReopen(options);
ASSERT_OK(s);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
for (int i = 0; i < 10; i++) {
std::string value;
s = db_->Get(ReadOptions(), "key" + std::to_string(i), &value);
ASSERT_TRUE(s.ok() || s.IsNotFound());
}
}
TEST_F(DBBasicTest, DisableTrackWal) {
Options options = CurrentOptions();
options.track_and_verify_wals_in_manifest = true;
options.write_buffer_size = 100;
options.env = env_;
DestroyAndReopen(options);
for (int i = 0; i < 100; i++) {
ASSERT_OK(Put("foo" + std::to_string(i), "value" + std::to_string(i)));
}
ASSERT_OK(dbfull()->TEST_SwitchMemtable());
ASSERT_OK(db_->SyncWAL());
ASSERT_FALSE(dbfull()->GetVersionSet()->GetWalSet().GetWals().empty());
Close();
options.track_and_verify_wals_in_manifest = false;
options.create_if_missing = false;
ASSERT_OK(TryReopen(options));
ASSERT_TRUE(dbfull()->GetVersionSet()->GetWalSet().GetWals().empty());
Close();
options.track_and_verify_wals_in_manifest = true;
options.create_if_missing = false;
ASSERT_OK(TryReopen(options));
ASSERT_TRUE(dbfull()->GetVersionSet()->GetWalSet().GetWals().empty());
Close();
}
TEST_F(DBBasicTest, ManifestChecksumMismatch) {
Options options = CurrentOptions();
DestroyAndReopen(options);
ASSERT_OK(Put("bar", "value"));
ASSERT_OK(Flush());
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->SetCallBack(
"LogWriter::EmitPhysicalRecord:BeforeEncodeChecksum", [&](void* arg) {
auto* crc = reinterpret_cast<uint32_t*>(arg);
*crc = *crc + 1;
});
SyncPoint::GetInstance()->EnableProcessing();
WriteOptions write_opts;
write_opts.disableWAL = true;
Status s = db_->Put(write_opts, "foo", "value");
ASSERT_OK(s);
ASSERT_OK(Flush());
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
ASSERT_OK(Put("foo", "value1"));
ASSERT_OK(Flush());
s = TryReopen(options);
ASSERT_TRUE(s.IsCorruption());
}
TEST_F(DBBasicTest, ConcurrentlyCloseDB) {
Options options = CurrentOptions();
DestroyAndReopen(options);
std::vector<std::thread> workers;
for (int i = 0; i < 10; i++) {
workers.emplace_back([&]() {
auto s = db_->Close();
ASSERT_OK(s);
});
}
for (auto& w : workers) {
w.join();
}
}
class DBBasicTestTrackWal : public DBTestBase,
public testing::WithParamInterface<bool> {
public:
DBBasicTestTrackWal()
: DBTestBase("db_basic_test_track_wal", false) {}
int CountWalFiles() {
VectorLogPtr log_files;
EXPECT_OK(dbfull()->GetSortedWalFiles(log_files));
return static_cast<int>(log_files.size());
};
};
TEST_P(DBBasicTestTrackWal, DoNotTrackObsoleteWal) {
Options options = CurrentOptions();
options.create_if_missing = true;
options.track_and_verify_wals_in_manifest = true;
options.atomic_flush = GetParam();
DestroyAndReopen(options);
CreateAndReopenWithCF({"cf"}, options);
ASSERT_EQ(handles_.size(), 2); ASSERT_OK(db_->DisableFileDeletions());
constexpr int n = 10;
std::vector<std::unique_ptr<LogFile>> wals(n);
for (size_t i = 0; i < n; i++) {
const int cf = i % 2;
ASSERT_OK(db_->GetCurrentWalFile(&wals[i]));
ASSERT_OK(Put(cf, "k" + std::to_string(i), "v" + std::to_string(i)));
ASSERT_OK(Flush({0, 1}));
}
ASSERT_EQ(CountWalFiles(), n);
ASSERT_OK(db_->SyncWAL());
Close();
for (const auto& wal : wals) {
ASSERT_OK(env_->DeleteFile(LogFileName(dbname_, wal->LogNumber())));
}
ASSERT_OK(TryReopenWithColumnFamilies({"default", "cf"}, options));
Destroy(options);
}
INSTANTIATE_TEST_CASE_P(DBBasicTestTrackWal, DBBasicTestTrackWal,
testing::Bool());
class DBBasicTestMultiGet : public DBTestBase {
public:
DBBasicTestMultiGet(std::string test_dir, int num_cfs,
bool uncompressed_cache, bool _compression_enabled,
bool _fill_cache, uint32_t compression_parallel_threads)
: DBTestBase(test_dir, false) {
compression_enabled_ = _compression_enabled;
fill_cache_ = _fill_cache;
if (uncompressed_cache) {
std::shared_ptr<Cache> cache = NewLRUCache(1048576);
uncompressed_cache_ = std::make_shared<MyBlockCache>(cache);
}
env_->count_random_reads_ = true;
Options options = CurrentOptions();
Random rnd(301);
BlockBasedTableOptions table_options;
if (compression_enabled_) {
std::vector<CompressionType> compression_types;
compression_types = GetSupportedCompressions();
CompressionType tmp_type = kNoCompression;
for (auto c_type : compression_types) {
if (c_type != kNoCompression) {
tmp_type = c_type;
break;
}
}
if (tmp_type != kNoCompression) {
options.compression = tmp_type;
} else {
compression_enabled_ = false;
}
}
table_options.block_cache = uncompressed_cache_;
if (table_options.block_cache == nullptr) {
table_options.no_block_cache = true;
} else {
table_options.pin_l0_filter_and_index_blocks_in_cache = true;
}
table_options.flush_block_policy_factory.reset(
new MyFlushBlockPolicyFactory(10));
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
if (!compression_enabled_) {
options.compression = kNoCompression;
} else {
options.compression_opts.parallel_threads = compression_parallel_threads;
}
options_ = options;
Reopen(options);
if (num_cfs > 1) {
for (int cf = 0; cf < num_cfs; ++cf) {
cf_names_.emplace_back("cf" + std::to_string(cf));
}
CreateColumnFamilies(cf_names_, options);
cf_names_.emplace_back("default");
}
std::string zero_str(128, '\0');
for (int cf = 0; cf < num_cfs; ++cf) {
for (int i = 0; i < 100; ++i) {
values_.emplace_back(rnd.RandomString(128) + zero_str);
assert(((num_cfs == 1) ? Put(Key(i), values_[i])
: Put(cf, Key(i), values_[i])) == Status::OK());
}
if (num_cfs == 1) {
EXPECT_OK(Flush());
} else {
EXPECT_OK(dbfull()->Flush(FlushOptions(), handles_[cf]));
}
for (int i = 0; i < 100; ++i) {
uncompressable_values_.emplace_back(rnd.RandomString(256) + '\0');
std::string tmp_key = "a" + Key(i);
assert(((num_cfs == 1) ? Put(tmp_key, uncompressable_values_[i])
: Put(cf, tmp_key, uncompressable_values_[i])) ==
Status::OK());
}
if (num_cfs == 1) {
EXPECT_OK(Flush());
} else {
EXPECT_OK(dbfull()->Flush(FlushOptions(), handles_[cf]));
}
}
if (compressed_cache_) {
compressed_cache_->SetCapacity(0);
compressed_cache_->SetCapacity(1048576);
}
}
bool CheckValue(int i, const std::string& value) {
if (values_[i].compare(value) == 0) {
return true;
}
return false;
}
bool CheckUncompressableValue(int i, const std::string& value) {
if (uncompressable_values_[i].compare(value) == 0) {
return true;
}
return false;
}
const std::vector<std::string>& GetCFNames() const { return cf_names_; }
int num_lookups() { return uncompressed_cache_->num_lookups(); }
int num_found() { return uncompressed_cache_->num_found(); }
int num_inserts() { return uncompressed_cache_->num_inserts(); }
int num_lookups_compressed() { return compressed_cache_->num_lookups(); }
int num_found_compressed() { return compressed_cache_->num_found(); }
int num_inserts_compressed() { return compressed_cache_->num_inserts(); }
bool fill_cache() { return fill_cache_; }
bool compression_enabled() { return compression_enabled_; }
bool has_compressed_cache() { return compressed_cache_ != nullptr; }
bool has_uncompressed_cache() { return uncompressed_cache_ != nullptr; }
Options get_options() { return options_; }
static void SetUpTestCase() {}
static void TearDownTestCase() {}
protected:
class MyBlockCache : public CacheWrapper {
public:
explicit MyBlockCache(std::shared_ptr<Cache> target)
: CacheWrapper(target),
num_lookups_(0),
num_found_(0),
num_inserts_(0) {}
const char* Name() const override { return "MyBlockCache"; }
Status Insert(const Slice& key, Cache::ObjectPtr value,
const CacheItemHelper* helper, size_t charge,
Handle** handle = nullptr, Priority priority = Priority::LOW,
const Slice& compressed = Slice(),
CompressionType type = kNoCompression) override {
num_inserts_++;
return target_->Insert(key, value, helper, charge, handle, priority,
compressed, type);
}
Handle* Lookup(const Slice& key, const CacheItemHelper* helper,
CreateContext* create_context,
Priority priority = Priority::LOW,
Statistics* stats = nullptr) override {
num_lookups_++;
Handle* handle =
target_->Lookup(key, helper, create_context, priority, stats);
if (handle != nullptr) {
num_found_++;
}
return handle;
}
int num_lookups() { return num_lookups_; }
int num_found() { return num_found_; }
int num_inserts() { return num_inserts_; }
private:
int num_lookups_;
int num_found_;
int num_inserts_;
};
std::shared_ptr<MyBlockCache> compressed_cache_;
std::shared_ptr<MyBlockCache> uncompressed_cache_;
Options options_;
bool compression_enabled_;
std::vector<std::string> values_;
std::vector<std::string> uncompressable_values_;
bool fill_cache_;
std::vector<std::string> cf_names_;
};
class DBBasicTestWithParallelIO : public DBBasicTestMultiGet,
public testing::WithParamInterface<
std::tuple<bool, bool, bool, uint32_t>> {
public:
DBBasicTestWithParallelIO()
: DBBasicTestMultiGet("/db_basic_test_with_parallel_io", 1,
std::get<0>(GetParam()), std::get<1>(GetParam()),
std::get<2>(GetParam()), std::get<3>(GetParam())) {}
};
TEST_P(DBBasicTestWithParallelIO, MultiGet) {
std::vector<std::string> key_data(10);
std::vector<Slice> keys;
std::vector<PinnableSlice> values(10);
std::vector<Status> statuses;
ReadOptions ro;
ro.fill_cache = fill_cache();
key_data.emplace_back(Key(0));
keys.emplace_back(key_data.back());
key_data.emplace_back(Key(50));
keys.emplace_back(key_data.back());
statuses.resize(keys.size());
dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
keys.data(), values.data(), statuses.data(), true);
ASSERT_TRUE(CheckValue(0, values[0].ToString()));
ASSERT_TRUE(CheckValue(50, values[1].ToString()));
int random_reads = env_->random_read_counter_.Read();
key_data[0] = Key(1);
key_data[1] = Key(51);
keys[0] = Slice(key_data[0]);
keys[1] = Slice(key_data[1]);
values[0].Reset();
values[1].Reset();
dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
keys.data(), values.data(), statuses.data(), true);
ASSERT_TRUE(CheckValue(1, values[0].ToString()));
ASSERT_TRUE(CheckValue(51, values[1].ToString()));
bool read_from_cache = false;
if (fill_cache()) {
if (has_uncompressed_cache()) {
read_from_cache = true;
} else if (has_compressed_cache() && compression_enabled()) {
read_from_cache = true;
}
}
int expected_reads = random_reads + (read_from_cache ? 0 : 2);
ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads);
keys.resize(10);
statuses.resize(10);
std::vector<int> key_ints{1, 2, 15, 16, 55, 81, 82, 83, 84, 85};
for (size_t i = 0; i < key_ints.size(); ++i) {
key_data[i] = Key(key_ints[i]);
keys[i] = Slice(key_data[i]);
statuses[i] = Status::OK();
values[i].Reset();
}
dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
keys.data(), values.data(), statuses.data(), true);
for (size_t i = 0; i < key_ints.size(); ++i) {
ASSERT_OK(statuses[i]);
ASSERT_TRUE(CheckValue(key_ints[i], values[i].ToString()));
}
if (compression_enabled() && !has_compressed_cache()) {
expected_reads += (read_from_cache ? 2 : 3);
} else {
expected_reads += (read_from_cache ? 2 : 4);
}
ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads);
keys.resize(10);
statuses.resize(10);
std::vector<int> key_uncmp{1, 2, 15, 16, 55, 81, 82, 83, 84, 85};
for (size_t i = 0; i < key_uncmp.size(); ++i) {
key_data[i] = "a" + Key(key_uncmp[i]);
keys[i] = Slice(key_data[i]);
statuses[i] = Status::OK();
values[i].Reset();
}
dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
keys.data(), values.data(), statuses.data(), true);
for (size_t i = 0; i < key_uncmp.size(); ++i) {
ASSERT_OK(statuses[i]);
ASSERT_TRUE(CheckUncompressableValue(key_uncmp[i], values[i].ToString()));
}
if (compression_enabled() && !has_compressed_cache()) {
expected_reads += (read_from_cache ? 3 : 3);
} else {
expected_reads += (read_from_cache ? 4 : 4);
}
ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads);
keys.resize(5);
statuses.resize(5);
std::vector<int> key_tr{1, 2, 15, 16, 55};
for (size_t i = 0; i < key_tr.size(); ++i) {
key_data[i] = "a" + Key(key_tr[i]);
keys[i] = Slice(key_data[i]);
statuses[i] = Status::OK();
values[i].Reset();
}
dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
keys.data(), values.data(), statuses.data(), true);
for (size_t i = 0; i < key_tr.size(); ++i) {
ASSERT_OK(statuses[i]);
ASSERT_TRUE(CheckUncompressableValue(key_tr[i], values[i].ToString()));
}
if (compression_enabled() && !has_compressed_cache()) {
expected_reads += (read_from_cache ? 0 : 2);
ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads);
} else {
if (has_uncompressed_cache()) {
expected_reads += (read_from_cache ? 0 : 3);
ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads);
} else {
ASSERT_TRUE(env_->random_read_counter_.Read() >= expected_reads);
}
}
}
TEST_P(DBBasicTestWithParallelIO, MultiGetDirectIO) {
class FakeDirectIOEnv : public EnvWrapper {
class FakeDirectIOSequentialFile;
class FakeDirectIORandomAccessFile;
public:
FakeDirectIOEnv(Env* env) : EnvWrapper(env) {}
static const char* kClassName() { return "FakeDirectIOEnv"; }
const char* Name() const override { return kClassName(); }
Status NewRandomAccessFile(const std::string& fname,
std::unique_ptr<RandomAccessFile>* result,
const EnvOptions& options) override {
std::unique_ptr<RandomAccessFile> file;
assert(options.use_direct_reads);
EnvOptions opts = options;
opts.use_direct_reads = false;
Status s = target()->NewRandomAccessFile(fname, &file, opts);
if (!s.ok()) {
return s;
}
result->reset(new FakeDirectIORandomAccessFile(std::move(file)));
return s;
}
private:
class FakeDirectIOSequentialFile : public SequentialFileWrapper {
public:
FakeDirectIOSequentialFile(std::unique_ptr<SequentialFile>&& file)
: SequentialFileWrapper(file.get()), file_(std::move(file)) {}
~FakeDirectIOSequentialFile() {}
bool use_direct_io() const override { return true; }
size_t GetRequiredBufferAlignment() const override { return 1; }
private:
std::unique_ptr<SequentialFile> file_;
};
class FakeDirectIORandomAccessFile : public RandomAccessFileWrapper {
public:
FakeDirectIORandomAccessFile(std::unique_ptr<RandomAccessFile>&& file)
: RandomAccessFileWrapper(file.get()), file_(std::move(file)) {}
~FakeDirectIORandomAccessFile() {}
bool use_direct_io() const override { return true; }
size_t GetRequiredBufferAlignment() const override { return 1; }
private:
std::unique_ptr<RandomAccessFile> file_;
};
};
std::unique_ptr<FakeDirectIOEnv> env(new FakeDirectIOEnv(env_));
Options opts = get_options();
opts.env = env.get();
opts.use_direct_reads = true;
Reopen(opts);
std::vector<std::string> key_data(10);
std::vector<Slice> keys;
std::vector<PinnableSlice> values(10);
std::vector<Status> statuses;
ReadOptions ro;
ro.fill_cache = fill_cache();
key_data.emplace_back(Key(0));
keys.emplace_back(key_data.back());
key_data.emplace_back(Key(50));
keys.emplace_back(key_data.back());
statuses.resize(keys.size());
dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
keys.data(), values.data(), statuses.data(), true);
ASSERT_TRUE(CheckValue(0, values[0].ToString()));
ASSERT_TRUE(CheckValue(50, values[1].ToString()));
int random_reads = env_->random_read_counter_.Read();
key_data[0] = Key(1);
key_data[1] = Key(51);
keys[0] = Slice(key_data[0]);
keys[1] = Slice(key_data[1]);
values[0].Reset();
values[1].Reset();
if (uncompressed_cache_) {
uncompressed_cache_->SetCapacity(0);
uncompressed_cache_->SetCapacity(1048576);
}
dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
keys.data(), values.data(), statuses.data(), true);
ASSERT_TRUE(CheckValue(1, values[0].ToString()));
ASSERT_TRUE(CheckValue(51, values[1].ToString()));
bool read_from_cache = false;
if (fill_cache()) {
if (has_uncompressed_cache()) {
read_from_cache = true;
} else if (has_compressed_cache() && compression_enabled()) {
read_from_cache = true;
}
}
int expected_reads = random_reads;
if (!compression_enabled() || !has_compressed_cache()) {
expected_reads += 2;
} else {
expected_reads += (read_from_cache ? 0 : 2);
}
if (env_->random_read_counter_.Read() != expected_reads) {
ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads);
}
Close();
}
TEST_P(DBBasicTestWithParallelIO, MultiGetWithChecksumMismatch) {
std::vector<std::string> key_data(10);
std::vector<Slice> keys;
std::vector<PinnableSlice> values(10);
std::vector<Status> statuses;
int read_count = 0;
ReadOptions ro;
ro.fill_cache = fill_cache();
SyncPoint::GetInstance()->SetCallBack(
"RetrieveMultipleBlocks:VerifyChecksum", [&](void* status) {
Status* s = static_cast<Status*>(status);
read_count++;
if (read_count == 2) {
*s = Status::Corruption();
}
});
SyncPoint::GetInstance()->EnableProcessing();
key_data.emplace_back(Key(0));
keys.emplace_back(key_data.back());
key_data.emplace_back(Key(50));
keys.emplace_back(key_data.back());
statuses.resize(keys.size());
dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
keys.data(), values.data(), statuses.data(), true);
ASSERT_TRUE(CheckValue(0, values[0].ToString()));
ASSERT_EQ(statuses[0], Status::OK());
ASSERT_EQ(statuses[1], Status::Corruption());
SyncPoint::GetInstance()->DisableProcessing();
}
TEST_P(DBBasicTestWithParallelIO, MultiGetWithMissingFile) {
std::vector<std::string> key_data(10);
std::vector<Slice> keys;
std::vector<PinnableSlice> values(10);
std::vector<Status> statuses;
ReadOptions ro;
ro.fill_cache = fill_cache();
SyncPoint::GetInstance()->SetCallBack(
"TableCache::MultiGet:FindTable", [&](void* status) {
Status* s = static_cast<Status*>(status);
*s = Status::IOError();
});
SyncPoint::GetInstance()->SetCallBack(
"SanitizeOptions::AfterChangeMaxOpenFiles", [&](void* arg) {
int* max_open_files = (int*)arg;
*max_open_files = 11;
});
SyncPoint::GetInstance()->EnableProcessing();
Reopen(CurrentOptions());
key_data.emplace_back(Key(0));
keys.emplace_back(key_data.back());
key_data.emplace_back(Key(50));
keys.emplace_back(key_data.back());
statuses.resize(keys.size());
dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
keys.data(), values.data(), statuses.data(), true);
ASSERT_EQ(statuses[0], Status::IOError());
ASSERT_EQ(statuses[1], Status::IOError());
SyncPoint::GetInstance()->DisableProcessing();
}
INSTANTIATE_TEST_CASE_P(ParallelIO, DBBasicTestWithParallelIO,
::testing::Combine(::testing::Bool(), ::testing::Bool(),
::testing::Bool(),
::testing::Values(1, 4)));
class DeadlineFS;
class DeadlineRandomAccessFile : public FSRandomAccessFileOwnerWrapper {
public:
DeadlineRandomAccessFile(DeadlineFS& fs,
std::unique_ptr<FSRandomAccessFile>& file)
: FSRandomAccessFileOwnerWrapper(std::move(file)), fs_(fs) {}
IOStatus Read(uint64_t offset, size_t len, const IOOptions& opts,
Slice* result, char* scratch,
IODebugContext* dbg) const override;
IOStatus MultiRead(FSReadRequest* reqs, size_t num_reqs,
const IOOptions& options, IODebugContext* dbg) override;
IOStatus ReadAsync(FSReadRequest& req, const IOOptions& opts,
std::function<void(FSReadRequest&, void*)> cb,
void* cb_arg, void** io_handle, IOHandleDeleter* del_fn,
IODebugContext* dbg) override;
private:
DeadlineFS& fs_;
std::unique_ptr<FSRandomAccessFile> file_;
};
class DeadlineFS : public FileSystemWrapper {
public:
explicit DeadlineFS(SpecialEnv* env, bool error_on_delay)
: FileSystemWrapper(env->GetFileSystem()),
deadline_(std::chrono::microseconds::zero()),
io_timeout_(std::chrono::microseconds::zero()),
env_(env),
timedout_(false),
ignore_deadline_(false),
error_on_delay_(error_on_delay) {}
static const char* kClassName() { return "DeadlineFileSystem"; }
const char* Name() const override { return kClassName(); }
IOStatus NewRandomAccessFile(const std::string& fname,
const FileOptions& opts,
std::unique_ptr<FSRandomAccessFile>* result,
IODebugContext* dbg) override {
std::unique_ptr<FSRandomAccessFile> file;
IOStatus s = target()->NewRandomAccessFile(fname, opts, &file, dbg);
EXPECT_OK(s);
result->reset(new DeadlineRandomAccessFile(*this, file));
const std::chrono::microseconds deadline = GetDeadline();
const std::chrono::microseconds io_timeout = GetIOTimeout();
if (deadline.count() || io_timeout.count()) {
AssertDeadline(deadline, io_timeout, opts.io_options);
}
return ShouldDelay(opts.io_options);
}
void SetDelayTrigger(const std::chrono::microseconds deadline,
const std::chrono::microseconds io_timeout,
const int trigger) {
delay_trigger_ = trigger;
io_count_ = 0;
deadline_ = deadline;
io_timeout_ = io_timeout;
timedout_ = false;
}
IOStatus ShouldDelay(const IOOptions& opts) {
if (timedout_) {
return IOStatus::TimedOut();
} else if (!deadline_.count() && !io_timeout_.count()) {
return IOStatus::OK();
}
if (!ignore_deadline_ && delay_trigger_ == io_count_++) {
env_->SleepForMicroseconds(static_cast<int>(opts.timeout.count() + 1));
timedout_ = true;
if (error_on_delay_) {
return IOStatus::TimedOut();
}
}
return IOStatus::OK();
}
const std::chrono::microseconds GetDeadline() {
return ignore_deadline_ ? std::chrono::microseconds::zero() : deadline_;
}
const std::chrono::microseconds GetIOTimeout() {
return ignore_deadline_ ? std::chrono::microseconds::zero() : io_timeout_;
}
bool TimedOut() { return timedout_; }
void IgnoreDeadline(bool ignore) { ignore_deadline_ = ignore; }
void AssertDeadline(const std::chrono::microseconds deadline,
const std::chrono::microseconds io_timeout,
const IOOptions& opts) const {
std::chrono::microseconds now =
std::chrono::microseconds(env_->NowMicros());
std::chrono::microseconds timeout;
if (deadline.count()) {
timeout = deadline - now;
if (io_timeout.count()) {
timeout = std::min(timeout, io_timeout);
}
} else {
timeout = io_timeout;
}
if (opts.timeout != timeout) {
ASSERT_EQ(timeout, opts.timeout);
}
}
private:
int delay_trigger_;
int io_count_;
std::chrono::microseconds deadline_;
std::chrono::microseconds io_timeout_;
SpecialEnv* env_;
bool timedout_;
bool ignore_deadline_;
bool error_on_delay_;
};
IOStatus DeadlineRandomAccessFile::Read(uint64_t offset, size_t len,
const IOOptions& opts, Slice* result,
char* scratch,
IODebugContext* dbg) const {
const std::chrono::microseconds deadline = fs_.GetDeadline();
const std::chrono::microseconds io_timeout = fs_.GetIOTimeout();
IOStatus s;
if (deadline.count() || io_timeout.count()) {
fs_.AssertDeadline(deadline, io_timeout, opts);
}
if (s.ok()) {
s = FSRandomAccessFileWrapper::Read(offset, len, opts, result, scratch,
dbg);
}
if (s.ok()) {
s = fs_.ShouldDelay(opts);
}
return s;
}
IOStatus DeadlineRandomAccessFile::ReadAsync(
FSReadRequest& req, const IOOptions& opts,
std::function<void(FSReadRequest&, void*)> cb, void* cb_arg,
void** io_handle, IOHandleDeleter* del_fn, IODebugContext* dbg) {
const std::chrono::microseconds deadline = fs_.GetDeadline();
const std::chrono::microseconds io_timeout = fs_.GetIOTimeout();
IOStatus s;
if (deadline.count() || io_timeout.count()) {
fs_.AssertDeadline(deadline, io_timeout, opts);
}
if (s.ok()) {
s = FSRandomAccessFileWrapper::ReadAsync(req, opts, cb, cb_arg, io_handle,
del_fn, dbg);
}
if (s.ok()) {
s = fs_.ShouldDelay(opts);
}
return s;
}
IOStatus DeadlineRandomAccessFile::MultiRead(FSReadRequest* reqs,
size_t num_reqs,
const IOOptions& options,
IODebugContext* dbg) {
const std::chrono::microseconds deadline = fs_.GetDeadline();
const std::chrono::microseconds io_timeout = fs_.GetIOTimeout();
IOStatus s;
if (deadline.count() || io_timeout.count()) {
fs_.AssertDeadline(deadline, io_timeout, options);
}
if (s.ok()) {
s = FSRandomAccessFileWrapper::MultiRead(reqs, num_reqs, options, dbg);
}
if (s.ok()) {
s = fs_.ShouldDelay(options);
}
return s;
}
class DBBasicTestMultiGetDeadline : public DBBasicTestMultiGet,
public testing::WithParamInterface<bool> {
public:
DBBasicTestMultiGetDeadline()
: DBBasicTestMultiGet(
"db_basic_test_multiget_deadline" ,
10 , true ,
true , true ,
1 ) {}
inline void CheckStatus(std::vector<Status>& statuses, size_t num_ok) {
for (size_t i = 0; i < statuses.size(); ++i) {
if (i < num_ok) {
EXPECT_OK(statuses[i]);
} else {
if (statuses[i] != Status::TimedOut()) {
EXPECT_EQ(statuses[i], Status::TimedOut());
}
}
}
}
};
TEST_P(DBBasicTestMultiGetDeadline, MultiGetDeadlineExceeded) {
#ifndef USE_COROUTINES
if (GetParam()) {
ROCKSDB_GTEST_SKIP("This test requires coroutine support");
return;
}
#endif std::shared_ptr<DeadlineFS> fs = std::make_shared<DeadlineFS>(env_, false);
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
Options options = CurrentOptions();
std::shared_ptr<Cache> cache = NewLRUCache(1048576);
BlockBasedTableOptions table_options;
table_options.block_cache = cache;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
options.env = env.get();
SetTimeElapseOnlySleepOnReopen(&options);
ReopenWithColumnFamilies(GetCFNames(), options);
std::vector<std::string> key_str;
size_t i;
for (i = 0; i < 10; ++i) {
key_str.emplace_back(Key(static_cast<int>(i)));
}
std::vector<ColumnFamilyHandle*> cfs(key_str.size());
std::vector<Slice> keys(key_str.size());
std::vector<PinnableSlice> pin_values(keys.size());
for (i = 0; i < key_str.size(); ++i) {
cfs[i] = handles_[i / 2];
keys[i] = Slice(key_str[i].data(), key_str[i].size());
}
ReadOptions ro;
ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000};
ro.async_io = GetParam();
fs->SetDelayTrigger(ro.deadline, ro.io_timeout, 0);
std::vector<Status> statuses(key_str.size());
dbfull()->MultiGet(ro, keys.size(), cfs.data(), keys.data(),
pin_values.data(), statuses.data());
CheckStatus(statuses, 2);
for (PinnableSlice& value : pin_values) {
value.Reset();
}
cache->SetCapacity(0);
cache->SetCapacity(1048576);
statuses.clear();
statuses.resize(keys.size());
ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000};
fs->SetDelayTrigger(ro.deadline, ro.io_timeout, 2);
dbfull()->MultiGet(ro, keys.size(), cfs.data(), keys.data(),
pin_values.data(), statuses.data());
CheckStatus(statuses, 6);
for (PinnableSlice& value : pin_values) {
value.Reset();
}
cache->SetCapacity(0);
cache->SetCapacity(1048576);
statuses.clear();
statuses.resize(keys.size());
ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000};
fs->SetDelayTrigger(ro.deadline, ro.io_timeout, 3);
dbfull()->MultiGet(ro, keys.size(), cfs.data(), keys.data(),
pin_values.data(), statuses.data());
CheckStatus(statuses, 8);
for (PinnableSlice& value : pin_values) {
value.Reset();
}
cache->SetCapacity(0);
cache->SetCapacity(1048576);
key_str.clear();
for (i = 0; i < 100; ++i) {
key_str.emplace_back(Key(static_cast<int>(i)));
}
keys.resize(key_str.size());
pin_values.clear();
pin_values.resize(key_str.size());
for (i = 0; i < key_str.size(); ++i) {
keys[i] = Slice(key_str[i].data(), key_str[i].size());
}
statuses.clear();
statuses.resize(keys.size());
ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000};
fs->SetDelayTrigger(ro.deadline, ro.io_timeout, 1);
dbfull()->MultiGet(ro, handles_[0], keys.size(), keys.data(),
pin_values.data(), statuses.data());
CheckStatus(statuses, 64);
Close();
}
INSTANTIATE_TEST_CASE_P(DeadlineIO, DBBasicTestMultiGetDeadline,
::testing::Bool());
TEST_F(DBBasicTest, ManifestWriteFailure) {
Options options = GetDefaultOptions();
options.create_if_missing = true;
options.disable_auto_compactions = true;
options.env = env_;
options.enable_blob_files = true;
options.blob_file_size = 0;
DestroyAndReopen(options);
ASSERT_OK(Put("foo", "bar"));
ASSERT_OK(Flush());
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->SetCallBack(
"VersionSet::ProcessManifestWrites:AfterSyncManifest", [&](void* arg) {
ASSERT_NE(nullptr, arg);
auto* s = static_cast<Status*>(arg);
ASSERT_OK(*s);
*s = Status::IOError();
});
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(Put("key", "value"));
ASSERT_NOK(Flush());
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->EnableProcessing();
Reopen(options);
ASSERT_EQ("bar", Get("foo"));
ASSERT_EQ("value", Get("key"));
}
TEST_F(DBBasicTest, DestroyDefaultCfHandle) {
Options options = GetDefaultOptions();
options.create_if_missing = true;
DestroyAndReopen(options);
CreateAndReopenWithCF({"pikachu"}, options);
for (const auto* h : handles_) {
ASSERT_NE(db_->DefaultColumnFamily(), h);
}
assert(db_->DefaultColumnFamily());
ASSERT_EQ(0U, db_->DefaultColumnFamily()->GetID());
assert(handles_[0]);
ASSERT_EQ(0U, handles_[0]->GetID());
for (auto* h : handles_) {
ASSERT_OK(db_->DestroyColumnFamilyHandle(h));
}
handles_.clear();
ColumnFamilyHandle* default_cf = db_->DefaultColumnFamily();
ASSERT_TRUE(db_->DestroyColumnFamilyHandle(default_cf).IsInvalidArgument());
}
TEST_F(DBBasicTest, FailOpenIfLoggerCreationFail) {
Options options = GetDefaultOptions();
options.create_if_missing = true;
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->SetCallBack(
"rocksdb::CreateLoggerFromOptions:AfterGetPath", [&](void* arg) {
auto* s = static_cast<Status*>(arg);
assert(s);
*s = Status::IOError("Injected");
});
SyncPoint::GetInstance()->EnableProcessing();
Status s = TryReopen(options);
ASSERT_EQ(nullptr, options.info_log);
ASSERT_TRUE(s.IsIOError());
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}
TEST_F(DBBasicTest, VerifyFileChecksums) {
Options options = GetDefaultOptions();
options.create_if_missing = true;
options.env = env_;
DestroyAndReopen(options);
ASSERT_OK(Put("a", "value"));
ASSERT_OK(Flush());
ASSERT_TRUE(db_->VerifyFileChecksums(ReadOptions()).IsInvalidArgument());
options.file_checksum_gen_factory = GetFileChecksumGenCrc32cFactory();
Reopen(options);
ASSERT_OK(db_->VerifyFileChecksums(ReadOptions()));
ASSERT_OK(Put("b", "value"));
ASSERT_OK(Flush());
ASSERT_OK(db_->VerifyFileChecksums(ReadOptions()));
class MisnamedFileChecksumGenerator : public FileChecksumGenCrc32c {
public:
MisnamedFileChecksumGenerator(const FileChecksumGenContext& context)
: FileChecksumGenCrc32c(context) {}
const char* Name() const override { return "sha1"; }
};
class MisnamedFileChecksumGenFactory : public FileChecksumGenCrc32cFactory {
public:
std::unique_ptr<FileChecksumGenerator> CreateFileChecksumGenerator(
const FileChecksumGenContext& context) override {
return std::unique_ptr<FileChecksumGenerator>(
new MisnamedFileChecksumGenerator(context));
}
};
options.file_checksum_gen_factory.reset(new MisnamedFileChecksumGenFactory());
Reopen(options);
ASSERT_TRUE(db_->VerifyFileChecksums(ReadOptions()).IsInvalidArgument());
}
TEST_F(DBBasicTest, VerifyFileChecksumsReadahead) {
Options options = GetDefaultOptions();
options.create_if_missing = true;
options.env = env_;
options.file_checksum_gen_factory = GetFileChecksumGenCrc32cFactory();
DestroyAndReopen(options);
Random rnd(301);
int alignment = 256 * 1024;
for (int i = 0; i < 16; ++i) {
ASSERT_OK(Put("key" + std::to_string(i), rnd.RandomString(alignment)));
}
ASSERT_OK(Flush());
std::vector<std::string> filenames;
int sst_cnt = 0;
std::string sst_name;
uint64_t sst_size;
uint64_t number;
FileType type;
ASSERT_OK(env_->GetChildren(dbname_, &filenames));
for (const auto& name : filenames) {
if (ParseFileName(name, &number, &type)) {
if (type == kTableFile) {
sst_cnt++;
sst_name = name;
}
}
}
ASSERT_EQ(sst_cnt, 1);
ASSERT_OK(env_->GetFileSize(dbname_ + '/' + sst_name, &sst_size));
bool last_read = false;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"GenerateOneFileChecksum::Chunk:0", [&](void* ) {
if (env_->random_read_bytes_counter_.load() == sst_size) {
EXPECT_FALSE(last_read);
last_read = true;
} else {
ASSERT_EQ(env_->random_read_bytes_counter_.load() & (alignment - 1),
0);
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
env_->count_random_reads_ = true;
env_->random_read_bytes_counter_ = 0;
env_->random_read_counter_.Reset();
ReadOptions ro;
ro.readahead_size = alignment;
ASSERT_OK(db_->VerifyFileChecksums(ro));
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ASSERT_TRUE(last_read);
ASSERT_EQ(env_->random_read_counter_.Read(),
(sst_size + alignment - 1) / (alignment));
}
TEST_F(DBBasicTest, DisallowMemtableWrite) {
Options options_allow = GetDefaultOptions();
options_allow.create_if_missing = true;
Options options_disallow = options_allow;
options_disallow.disallow_memtable_writes = true;
options_disallow.paranoid_memory_checks = true;
options_disallow.memtable_veirfy_per_key_checksum_on_seek = true;
DestroyAndReopen(options_allow);
CreateColumnFamilies({"cf1", "cf2"}, options_allow);
CreateColumnFamilies({"cf3"}, options_disallow);
ReopenWithColumnFamilies(
{"default", "cf1", "cf2", "cf3"},
{options_allow, options_allow, options_allow, options_disallow});
EXPECT_EQ(Put(0, "a0", "1").code(), Status::Code::kOk);
EXPECT_EQ(Put(1, "a1", "1").code(), Status::Code::kOk);
EXPECT_EQ(Put(2, "a2", "1").code(), Status::Code::kOk);
EXPECT_EQ(Put(3, "a3", "1").code(), Status::Code::kInvalidArgument);
EXPECT_EQ(Get(0, "a0"), "1");
EXPECT_EQ(Get(1, "a1"), "1");
EXPECT_EQ(Get(2, "a2"), "1");
EXPECT_EQ(Get(3, "a3"), "NOT_FOUND");
EXPECT_EQ(Delete(0, "z0").code(), Status::Code::kOk);
EXPECT_EQ(Delete(1, "z1").code(), Status::Code::kOk);
EXPECT_EQ(Delete(2, "z2").code(), Status::Code::kOk);
EXPECT_EQ(Delete(3, "z3").code(), Status::Code::kInvalidArgument);
WriteBatch wb;
EXPECT_EQ(wb.Put(handles_[0], "b0", "2").code(), Status::Code::kOk);
EXPECT_EQ(wb.Put(handles_[1], "b1", "2").code(), Status::Code::kOk);
EXPECT_EQ(wb.Put(handles_[2], "b2", "2").code(), Status::Code::kOk);
EXPECT_EQ(wb.Put(handles_[3], "b3", "2").code(),
Status::Code::kInvalidArgument);
ASSERT_OK(db_->Write({}, &wb));
wb.Clear();
EXPECT_EQ(Get(0, "b0"), "2");
EXPECT_EQ(Get(1, "b1"), "2");
EXPECT_EQ(Get(2, "b2"), "2");
EXPECT_EQ(Get(3, "b3"), "NOT_FOUND");
std::unique_ptr<Iterator> iter(
dbfull()->NewIterator(ReadOptions(), handles_[3]));
iter->Seek("a3");
ASSERT_OK(iter->status());
iter.reset();
ASSERT_EQ(TryReopenWithColumnFamilies(
{"default", "cf1", "cf2", "cf3"},
{options_allow, options_allow, options_disallow, options_allow})
.code(),
Status::Code::kInvalidArgument);
ReopenWithColumnFamilies({"default", "cf1", "cf2", "cf3"}, options_allow);
EXPECT_EQ(Get(0, "a0"), "1");
EXPECT_EQ(Get(1, "a1"), "1");
EXPECT_EQ(Get(2, "a2"), "1");
EXPECT_EQ(Get(3, "a3"), "NOT_FOUND");
ReopenWithColumnFamilies(
{"default", "cf1", "cf2", "cf3"},
{options_allow, options_allow, options_disallow, options_allow});
EXPECT_EQ(Get(0, "a0"), "1");
EXPECT_EQ(Get(1, "a1"), "1");
EXPECT_EQ(Get(2, "a2"), "1");
EXPECT_EQ(Get(3, "a3"), "NOT_FOUND");
EXPECT_EQ(Put(0, "c0", "3").code(), Status::Code::kOk);
EXPECT_EQ(Put(1, "c1", "3").code(), Status::Code::kOk);
EXPECT_EQ(Put(2, "c2", "3").code(), Status::Code::kInvalidArgument);
EXPECT_EQ(Put(3, "c3", "3").code(), Status::Code::kOk);
EXPECT_EQ(Get(0, "c0"), "3");
EXPECT_EQ(Get(1, "c1"), "3");
EXPECT_EQ(Get(2, "c2"), "NOT_FOUND");
EXPECT_EQ(Get(3, "c3"), "3");
Destroy(options_allow);
EXPECT_EQ(TryReopen(options_disallow).code(), Status::Code::kInvalidArgument);
}
TEST_F(DBBasicTest, DISABLED_ManualWalSync) {
Options options = CurrentOptions();
options.track_and_verify_wals_in_manifest = true;
options.wal_recovery_mode = WALRecoveryMode::kAbsoluteConsistency;
DestroyAndReopen(options);
ASSERT_OK(Put("x", "y"));
ASSERT_OK(db_->SyncWAL());
EXPECT_FALSE(dbfull()->GetVersionSet()->GetWalSet().GetWals().empty());
std::unique_ptr<LogFile> wal;
Status s = db_->GetCurrentWalFile(&wal);
ASSERT_OK(s);
Close();
EXPECT_OK(env_->DeleteFile(LogFileName(dbname_, wal->LogNumber())));
ASSERT_TRUE(TryReopen(options).IsCorruption());
}
class DBBasicTestDeadline
: public DBBasicTest,
public testing::WithParamInterface<std::tuple<bool, bool>> {};
TEST_P(DBBasicTestDeadline, PointLookupDeadline) {
std::shared_ptr<DeadlineFS> fs = std::make_shared<DeadlineFS>(env_, true);
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
bool set_deadline = std::get<0>(GetParam());
bool set_timeout = std::get<1>(GetParam());
for (int option_config = kDefault; option_config < kEnd; ++option_config) {
if (ShouldSkipOptions(option_config, kSkipPlainTable | kSkipMmapReads)) {
continue;
}
option_config_ = option_config;
Options options = CurrentOptions();
if (options.use_direct_reads) {
continue;
}
options.env = env.get();
options.disable_auto_compactions = true;
Cache* block_cache = nullptr;
SyncPoint::GetInstance()->SetCallBack(
"BlockBasedTable::Get:BeforeFilterMatch",
[&](void* ) { fs->IgnoreDeadline(true); });
SyncPoint::GetInstance()->SetCallBack(
"BlockBasedTable::Get:AfterFilterMatch",
[&](void* ) { fs->IgnoreDeadline(false); });
SyncPoint::GetInstance()->SetCallBack(
"SanitizeOptions::AfterChangeMaxOpenFiles", [&](void* arg) {
int* max_open_files = (int*)arg;
*max_open_files = 11;
});
SyncPoint::GetInstance()->EnableProcessing();
SetTimeElapseOnlySleepOnReopen(&options);
Reopen(options);
if (options.table_factory) {
block_cache = options.table_factory->GetOptions<Cache>(
TableFactory::kBlockCacheOpts());
}
Random rnd(301);
for (int i = 0; i < 400; ++i) {
std::string key = "k" + std::to_string(i);
ASSERT_OK(Put(key, rnd.RandomString(100)));
}
ASSERT_OK(Flush());
bool timedout = true;
int io_deadline_trigger = 0;
while (timedout) {
ReadOptions ro;
if (set_deadline) {
ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000};
}
if (set_timeout) {
ro.io_timeout = std::chrono::microseconds{5000};
}
fs->SetDelayTrigger(ro.deadline, ro.io_timeout, io_deadline_trigger);
block_cache->SetCapacity(0);
block_cache->SetCapacity(1048576);
std::string value;
Status s = dbfull()->Get(ro, "k50", &value);
if (fs->TimedOut()) {
ASSERT_EQ(s, Status::TimedOut());
} else {
timedout = false;
ASSERT_OK(s);
}
io_deadline_trigger++;
}
fs->SetDelayTrigger(std::chrono::microseconds::zero(),
std::chrono::microseconds::zero(), 0);
}
Close();
}
TEST_P(DBBasicTestDeadline, IteratorDeadline) {
std::shared_ptr<DeadlineFS> fs = std::make_shared<DeadlineFS>(env_, true);
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
bool set_deadline = std::get<0>(GetParam());
bool set_timeout = std::get<1>(GetParam());
for (int option_config = kDefault; option_config < kEnd; ++option_config) {
if (ShouldSkipOptions(option_config, kSkipPlainTable | kSkipMmapReads)) {
continue;
}
Options options = CurrentOptions();
if (options.use_direct_reads) {
continue;
}
options.env = env.get();
options.disable_auto_compactions = true;
Cache* block_cache = nullptr;
SyncPoint::GetInstance()->SetCallBack(
"SanitizeOptions::AfterChangeMaxOpenFiles", [&](void* arg) {
int* max_open_files = (int*)arg;
*max_open_files = 11;
});
SyncPoint::GetInstance()->EnableProcessing();
SetTimeElapseOnlySleepOnReopen(&options);
Reopen(options);
if (options.table_factory) {
block_cache = options.table_factory->GetOptions<Cache>(
TableFactory::kBlockCacheOpts());
}
Random rnd(301);
for (int i = 0; i < 400; ++i) {
std::string key = "k" + std::to_string(i);
ASSERT_OK(Put(key, rnd.RandomString(100)));
}
ASSERT_OK(Flush());
bool timedout = true;
int io_deadline_trigger = 0;
while (timedout) {
ReadOptions ro;
if (set_deadline) {
ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000};
}
if (set_timeout) {
ro.io_timeout = std::chrono::microseconds{5000};
}
fs->SetDelayTrigger(ro.deadline, ro.io_timeout, io_deadline_trigger);
block_cache->SetCapacity(0);
block_cache->SetCapacity(1048576);
Iterator* iter = dbfull()->NewIterator(ro);
int count = 0;
iter->Seek("k50");
while (iter->Valid() && count++ < 100) {
iter->Next();
}
if (fs->TimedOut()) {
ASSERT_FALSE(iter->Valid());
ASSERT_EQ(iter->status(), Status::TimedOut());
} else {
timedout = false;
ASSERT_OK(iter->status());
}
delete iter;
io_deadline_trigger++;
}
fs->SetDelayTrigger(std::chrono::microseconds::zero(),
std::chrono::microseconds::zero(), 0);
}
Close();
}
INSTANTIATE_TEST_CASE_P(DBBasicTestDeadline, DBBasicTestDeadline,
::testing::Values(std::make_tuple(true, false),
std::make_tuple(false, true),
std::make_tuple(true, true)));
class ChecksumCapturingFS : public FileSystemWrapper {
public:
explicit ChecksumCapturingFS(const std::shared_ptr<FileSystem>& base)
: FileSystemWrapper(base) {}
static const char* kClassName() { return "ChecksumCapturingFS"; }
const char* Name() const override { return kClassName(); }
IOStatus NewRandomAccessFile(const std::string& fname,
const FileOptions& opts,
std::unique_ptr<FSRandomAccessFile>* result,
IODebugContext* dbg) override {
if (fname.find(".sst") != std::string::npos) {
std::lock_guard<std::mutex> lock(mu_);
captured_file_checksum_ = opts.file_checksum;
captured_file_checksum_func_name_ = opts.file_checksum_func_name;
capture_count_++;
}
return target()->NewRandomAccessFile(fname, opts, result, dbg);
}
std::string GetCapturedFileChecksum() {
std::lock_guard<std::mutex> lock(mu_);
return captured_file_checksum_;
}
std::string GetCapturedFileChecksumFuncName() {
std::lock_guard<std::mutex> lock(mu_);
return captured_file_checksum_func_name_;
}
int GetCaptureCount() {
std::lock_guard<std::mutex> lock(mu_);
return capture_count_;
}
void Reset() {
std::lock_guard<std::mutex> lock(mu_);
captured_file_checksum_.clear();
captured_file_checksum_func_name_.clear();
capture_count_ = 0;
}
private:
std::mutex mu_;
std::string captured_file_checksum_;
std::string captured_file_checksum_func_name_;
int capture_count_ = 0;
};
TEST_F(DBBasicTest, FileChecksumInFileOptions) {
auto capturing_fs =
std::make_shared<ChecksumCapturingFS>(env_->GetFileSystem());
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, capturing_fs));
Options options = GetDefaultOptions();
options.create_if_missing = true;
options.env = env.get();
options.file_checksum_gen_factory = GetFileChecksumGenCrc32cFactory();
DestroyAndReopen(options);
ASSERT_OK(Put("key1", "value1"));
ASSERT_OK(Flush());
capturing_fs->Reset();
Reopen(options);
ASSERT_EQ("value1", Get("key1"));
ASSERT_GT(capturing_fs->GetCaptureCount(), 0);
ASSERT_FALSE(capturing_fs->GetCapturedFileChecksum().empty());
ASSERT_NE(capturing_fs->GetCapturedFileChecksumFuncName(),
capturing_fs->GetCapturedFileChecksum());
ASSERT_EQ(capturing_fs->GetCapturedFileChecksumFuncName(),
"FileChecksumCrc32c");
Close();
}
}
int main(int argc, char** argv) {
ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
RegisterCustomObjects(argc, argv);
return RUN_ALL_TESTS();
}