#include "db/memtable_list.h"
#include <algorithm>
#include <string>
#include <vector>
#include "db/merge_context.h"
#include "db/version_set.h"
#include "db/write_controller.h"
#include "rocksdb/db.h"
#include "rocksdb/status.h"
#include "rocksdb/write_buffer_manager.h"
#include "test_util/testharness.h"
#include "test_util/testutil.h"
#include "util/string_util.h"
#include "utilities/merge_operators.h"
namespace ROCKSDB_NAMESPACE {
namespace {
std::string ValueWithWriteTime(std::string value, uint64_t write_time) {
std::string result;
result = value;
PutFixed64(&result, write_time);
return result;
}
}
class MemTableListTest : public testing::Test {
public:
std::string dbname;
std::unique_ptr<DB> db;
Options options;
std::vector<ColumnFamilyHandle*> handles;
std::atomic<uint64_t> file_number;
MemTableListTest() : file_number(1) {
dbname = test::PerThreadDBPath("memtable_list_test");
options.create_if_missing = true;
EXPECT_OK(DestroyDB(dbname, options));
}
void CreateDB() {
if (db == nullptr) {
options.create_if_missing = true;
EXPECT_OK(DestroyDB(dbname, options));
ColumnFamilyOptions cf_options;
std::vector<ColumnFamilyDescriptor> cf_descs;
if (udt_enabled_) {
cf_options.comparator = test::BytewiseComparatorWithU64TsWrapper();
}
cf_descs.emplace_back(kDefaultColumnFamilyName, cf_options);
Status s = DB::Open(options, dbname, cf_descs, &handles, &db);
EXPECT_OK(s);
ColumnFamilyOptions cf_opt1, cf_opt2;
cf_opt1.cf_paths.emplace_back(dbname + "_one_1",
std::numeric_limits<uint64_t>::max());
cf_opt2.cf_paths.emplace_back(dbname + "_two_1",
std::numeric_limits<uint64_t>::max());
int sz = static_cast<int>(handles.size());
handles.resize(sz + 2);
s = db->CreateColumnFamily(cf_opt1, "one", &handles[1]);
EXPECT_OK(s);
s = db->CreateColumnFamily(cf_opt2, "two", &handles[2]);
EXPECT_OK(s);
cf_descs.emplace_back("one", cf_options);
cf_descs.emplace_back("two", cf_options);
}
}
~MemTableListTest() override {
if (db) {
std::vector<ColumnFamilyDescriptor> cf_descs(handles.size());
for (int i = 0; i != static_cast<int>(handles.size()); ++i) {
EXPECT_OK(handles[i]->GetDescriptor(&cf_descs[i]));
}
for (auto h : handles) {
if (h) {
EXPECT_OK(db->DestroyColumnFamilyHandle(h));
}
}
handles.clear();
db.reset();
EXPECT_OK(DestroyDB(dbname, options, cf_descs));
}
}
Status Mock_InstallMemtableFlushResults(
MemTableList* list, const autovector<ReadOnlyMemTable*>& m,
autovector<ReadOnlyMemTable*>* to_delete) {
test::NullLogger logger;
LogBuffer log_buffer(DEBUG_LEVEL, &logger);
CreateDB();
DBOptions db_options;
ImmutableDBOptions immutable_db_options(db_options);
EnvOptions env_options;
std::shared_ptr<Cache> table_cache(NewLRUCache(50000, 16));
WriteBufferManager write_buffer_manager(db_options.db_write_buffer_size);
WriteController write_controller(10000000u);
VersionSet versions(dbname, &immutable_db_options,
MutableDBOptions{db_options}, env_options,
table_cache.get(), &write_buffer_manager,
&write_controller, nullptr,
nullptr, "",
"", "",
nullptr, false);
std::vector<ColumnFamilyDescriptor> cf_descs;
cf_descs.emplace_back(kDefaultColumnFamilyName, ColumnFamilyOptions());
cf_descs.emplace_back("one", ColumnFamilyOptions());
cf_descs.emplace_back("two", ColumnFamilyOptions());
EXPECT_OK(versions.Recover(cf_descs, false));
auto column_family_set = versions.GetColumnFamilySet();
LogsWithPrepTracker dummy_prep_tracker;
auto cfd = column_family_set->GetDefault();
EXPECT_TRUE(nullptr != cfd);
uint64_t file_num = file_number.fetch_add(1);
IOStatus io_s;
InstrumentedMutex mutex;
InstrumentedMutexLock l(&mutex);
std::list<std::unique_ptr<FlushJobInfo>> flush_jobs_info;
Status s = list->TryInstallMemtableFlushResults(
cfd, m, &dummy_prep_tracker, &versions, &mutex, file_num, to_delete,
nullptr, &log_buffer, &flush_jobs_info);
EXPECT_OK(io_s);
return s;
}
Status Mock_InstallMemtableAtomicFlushResults(
autovector<MemTableList*>& lists, const autovector<uint32_t>& cf_ids,
const autovector<const autovector<ReadOnlyMemTable*>*>& mems_list,
autovector<ReadOnlyMemTable*>* to_delete) {
test::NullLogger logger;
LogBuffer log_buffer(DEBUG_LEVEL, &logger);
CreateDB();
DBOptions db_options;
ImmutableDBOptions immutable_db_options(db_options);
EnvOptions env_options;
std::shared_ptr<Cache> table_cache(NewLRUCache(50000, 16));
WriteBufferManager write_buffer_manager(db_options.db_write_buffer_size);
WriteController write_controller(10000000u);
VersionSet versions(dbname, &immutable_db_options,
MutableDBOptions{db_options}, env_options,
table_cache.get(), &write_buffer_manager,
&write_controller, nullptr,
nullptr, "",
"", "",
nullptr, false);
std::vector<ColumnFamilyDescriptor> cf_descs;
cf_descs.emplace_back(kDefaultColumnFamilyName, ColumnFamilyOptions());
cf_descs.emplace_back("one", ColumnFamilyOptions());
cf_descs.emplace_back("two", ColumnFamilyOptions());
EXPECT_OK(versions.Recover(cf_descs, false));
auto column_family_set = versions.GetColumnFamilySet();
LogsWithPrepTracker dummy_prep_tracker;
autovector<ColumnFamilyData*> cfds;
for (int i = 0; i != static_cast<int>(cf_ids.size()); ++i) {
cfds.emplace_back(column_family_set->GetColumnFamily(cf_ids[i]));
EXPECT_NE(nullptr, cfds[i]);
}
std::vector<FileMetaData> file_metas;
file_metas.reserve(cf_ids.size());
for (size_t i = 0; i != cf_ids.size(); ++i) {
FileMetaData meta;
uint64_t file_num = file_number.fetch_add(1);
meta.fd = FileDescriptor(file_num, 0, 0);
file_metas.emplace_back(meta);
}
autovector<FileMetaData*> file_meta_ptrs;
for (auto& meta : file_metas) {
file_meta_ptrs.push_back(&meta);
}
std::vector<std::list<std::unique_ptr<FlushJobInfo>>>
committed_flush_jobs_info_storage(cf_ids.size());
autovector<std::list<std::unique_ptr<FlushJobInfo>>*>
committed_flush_jobs_info;
for (int i = 0; i < static_cast<int>(cf_ids.size()); ++i) {
committed_flush_jobs_info.push_back(
&committed_flush_jobs_info_storage[i]);
}
InstrumentedMutex mutex;
InstrumentedMutexLock l(&mutex);
return InstallMemtableAtomicFlushResults(
&lists, cfds, mems_list, &versions, nullptr , &mutex,
file_meta_ptrs, committed_flush_jobs_info, to_delete, nullptr,
&log_buffer);
}
protected:
bool udt_enabled_ = false;
};
TEST_F(MemTableListTest, Empty) {
MemTableList list(1, 0);
ASSERT_EQ(0, list.NumNotFlushed());
ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
ASSERT_FALSE(list.IsFlushPending());
autovector<ReadOnlyMemTable*> mems;
list.PickMemtablesToFlush(
std::numeric_limits<uint64_t>::max() , &mems);
ASSERT_EQ(0, mems.size());
autovector<ReadOnlyMemTable*> to_delete;
list.current()->Unref(&to_delete);
ASSERT_EQ(0, to_delete.size());
}
TEST_F(MemTableListTest, GetTest) {
int min_write_buffer_number_to_merge = 2;
int64_t max_write_buffer_size_to_maintain = 0;
MemTableList list(min_write_buffer_number_to_merge,
max_write_buffer_size_to_maintain);
SequenceNumber seq = 1;
std::string value;
Status s;
MergeContext merge_context;
InternalKeyComparator ikey_cmp(options.comparator);
SequenceNumber max_covering_tombstone_seq = 0;
autovector<ReadOnlyMemTable*> to_delete;
LookupKey lkey("key1", seq);
bool found = list.current()->Get(lkey, &value, nullptr,
nullptr, &s, &merge_context,
&max_covering_tombstone_seq, ReadOptions());
ASSERT_FALSE(found);
InternalKeyComparator cmp(BytewiseComparator());
auto factory = std::make_shared<SkipListFactory>();
options.memtable_factory = factory;
options.merge_operator = MergeOperators::CreateStringAppendOperator();
ImmutableOptions ioptions(options);
WriteBufferManager wb(options.db_write_buffer_size);
MemTable* mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb,
kMaxSequenceNumber, 0 );
mem->Ref();
ASSERT_OK(
mem->Add(++seq, kTypeDeletion, "key1", "", nullptr ));
ASSERT_OK(mem->Add(++seq, kTypeValue, "key2", "value2",
nullptr ));
ASSERT_OK(mem->Add(++seq, kTypeValue, "key1", "value1",
nullptr ));
ASSERT_OK(mem->Add(++seq, kTypeValue, "key2", "value2.2",
nullptr ));
ASSERT_OK(mem->Add(++seq, kTypeValuePreferredSeqno, "key3",
ValueWithWriteTime("value3.1", 20),
nullptr ));
merge_context.Clear();
s = Status::OK();
found = mem->Get(LookupKey("key1", seq), &value, nullptr,
nullptr, &s, &merge_context,
&max_covering_tombstone_seq, ReadOptions(),
false );
ASSERT_TRUE(s.ok() && found);
ASSERT_EQ(value, "value1");
merge_context.Clear();
s = Status::OK();
found = mem->Get(LookupKey("key1", 2), &value, nullptr,
nullptr, &s, &merge_context,
&max_covering_tombstone_seq, ReadOptions(),
false );
ASSERT_TRUE(found && s.IsNotFound());
merge_context.Clear();
s = Status::OK();
found = mem->Get(LookupKey("key2", seq), &value, nullptr,
nullptr, &s, &merge_context,
&max_covering_tombstone_seq, ReadOptions(),
false );
ASSERT_TRUE(s.ok() && found);
ASSERT_EQ(value, "value2.2");
merge_context.Clear();
s = Status::OK();
found = mem->Get(LookupKey("key3", seq), &value, nullptr,
nullptr, &s, &merge_context,
&max_covering_tombstone_seq, ReadOptions(),
false );
ASSERT_TRUE(s.ok() && found);
ASSERT_EQ(value, "value3.1");
ASSERT_EQ(5, mem->NumEntries());
ASSERT_EQ(1, mem->NumDeletion());
mem->ConstructFragmentedRangeTombstones();
mem->SetID(1);
list.Add(mem, &to_delete);
SequenceNumber saved_seq = seq;
WriteBufferManager wb2(options.db_write_buffer_size);
MemTable* mem2 = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb2,
kMaxSequenceNumber, 0 );
mem2->SetID(2);
mem2->Ref();
ASSERT_OK(
mem2->Add(++seq, kTypeDeletion, "key1", "", nullptr ));
ASSERT_OK(mem2->Add(++seq, kTypeValue, "key2", "value2.3",
nullptr ));
ASSERT_OK(mem2->Add(++seq, kTypeMerge, "key3", "value3.2",
nullptr ));
mem2->ConstructFragmentedRangeTombstones();
list.Add(mem2, &to_delete);
merge_context.Clear();
s = Status::OK();
found =
list.current()->Get(LookupKey("key1", seq), &value, nullptr,
nullptr, &s, &merge_context,
&max_covering_tombstone_seq, ReadOptions());
ASSERT_TRUE(found && s.IsNotFound());
merge_context.Clear();
s = Status::OK();
found = list.current()->Get(LookupKey("key1", saved_seq), &value,
nullptr, nullptr, &s,
&merge_context, &max_covering_tombstone_seq,
ReadOptions());
ASSERT_TRUE(s.ok() && found);
ASSERT_EQ("value1", value);
merge_context.Clear();
s = Status::OK();
found =
list.current()->Get(LookupKey("key2", seq), &value, nullptr,
nullptr, &s, &merge_context,
&max_covering_tombstone_seq, ReadOptions());
ASSERT_TRUE(s.ok() && found);
ASSERT_EQ(value, "value2.3");
merge_context.Clear();
s = Status::OK();
found = list.current()->Get(LookupKey("key2", 1), &value, nullptr,
nullptr, &s, &merge_context,
&max_covering_tombstone_seq, ReadOptions());
ASSERT_FALSE(found);
merge_context.Clear();
s = Status::OK();
found =
list.current()->Get(LookupKey("key3", seq), &value, nullptr,
nullptr, &s, &merge_context,
&max_covering_tombstone_seq, ReadOptions());
ASSERT_TRUE(s.ok() && found);
ASSERT_EQ(value, "value3.1,value3.2");
ASSERT_EQ(2, list.NumNotFlushed());
list.current()->Unref(&to_delete);
for (ReadOnlyMemTable* m : to_delete) {
delete m;
}
}
TEST_F(MemTableListTest, GetFromHistoryTest) {
int min_write_buffer_number_to_merge = 2;
int64_t max_write_buffer_size_to_maintain = 2 * Arena::kInlineSize;
MemTableList list(min_write_buffer_number_to_merge,
max_write_buffer_size_to_maintain);
SequenceNumber seq = 1;
std::string value;
Status s;
MergeContext merge_context;
InternalKeyComparator ikey_cmp(options.comparator);
SequenceNumber max_covering_tombstone_seq = 0;
autovector<ReadOnlyMemTable*> to_delete;
LookupKey lkey("key1", seq);
bool found = list.current()->Get(lkey, &value, nullptr,
nullptr, &s, &merge_context,
&max_covering_tombstone_seq, ReadOptions());
ASSERT_FALSE(found);
InternalKeyComparator cmp(BytewiseComparator());
auto factory = std::make_shared<SkipListFactory>();
options.memtable_factory = factory;
ImmutableOptions ioptions(options);
WriteBufferManager wb(options.db_write_buffer_size);
MemTable* mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb,
kMaxSequenceNumber, 0 );
mem->Ref();
ASSERT_OK(
mem->Add(++seq, kTypeDeletion, "key1", "", nullptr ));
ASSERT_OK(mem->Add(++seq, kTypeValue, "key2", "value2",
nullptr ));
ASSERT_OK(mem->Add(++seq, kTypeValue, "key2", "value2.2",
nullptr ));
merge_context.Clear();
s = Status::OK();
found = mem->Get(LookupKey("key1", seq), &value, nullptr,
nullptr, &s, &merge_context,
&max_covering_tombstone_seq, ReadOptions(),
false );
ASSERT_TRUE(found && s.IsNotFound());
merge_context.Clear();
s = Status::OK();
found = mem->Get(LookupKey("key2", seq), &value, nullptr,
nullptr, &s, &merge_context,
&max_covering_tombstone_seq, ReadOptions(),
false );
ASSERT_TRUE(s.ok() && found);
ASSERT_EQ(value, "value2.2");
mem->ConstructFragmentedRangeTombstones();
list.Add(mem, &to_delete);
ASSERT_EQ(0, to_delete.size());
merge_context.Clear();
s = Status::OK();
found =
list.current()->Get(LookupKey("key1", seq), &value, nullptr,
nullptr, &s, &merge_context,
&max_covering_tombstone_seq, ReadOptions());
ASSERT_TRUE(found && s.IsNotFound());
merge_context.Clear();
s = Status::OK();
found =
list.current()->Get(LookupKey("key2", seq), &value, nullptr,
nullptr, &s, &merge_context,
&max_covering_tombstone_seq, ReadOptions());
ASSERT_TRUE(s.ok() && found);
ASSERT_EQ("value2.2", value);
autovector<ReadOnlyMemTable*> to_flush;
list.PickMemtablesToFlush(
std::numeric_limits<uint64_t>::max() , &to_flush);
ASSERT_EQ(1, to_flush.size());
s = Mock_InstallMemtableFlushResults(&list, to_flush, &to_delete);
ASSERT_OK(s);
ASSERT_EQ(0, list.NumNotFlushed());
ASSERT_EQ(1, list.NumFlushed());
ASSERT_EQ(0, to_delete.size());
merge_context.Clear();
found =
list.current()->Get(LookupKey("key1", seq), &value, nullptr,
nullptr, &s, &merge_context,
&max_covering_tombstone_seq, ReadOptions());
ASSERT_FALSE(found);
merge_context.Clear();
found =
list.current()->Get(LookupKey("key2", seq), &value, nullptr,
nullptr, &s, &merge_context,
&max_covering_tombstone_seq, ReadOptions());
ASSERT_FALSE(found);
merge_context.Clear();
s = Status::OK();
found = list.current()->GetFromHistory(
LookupKey("key1", seq), &value, nullptr,
nullptr, &s, &merge_context, &max_covering_tombstone_seq,
ReadOptions());
ASSERT_TRUE(found && s.IsNotFound());
merge_context.Clear();
s = Status::OK();
found = list.current()->GetFromHistory(
LookupKey("key2", seq), &value, nullptr,
nullptr, &s, &merge_context, &max_covering_tombstone_seq,
ReadOptions());
ASSERT_TRUE(found);
ASSERT_EQ("value2.2", value);
WriteBufferManager wb2(options.db_write_buffer_size);
MemTable* mem2 = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb2,
kMaxSequenceNumber, 0 );
mem2->Ref();
ASSERT_OK(
mem2->Add(++seq, kTypeDeletion, "key1", "", nullptr ));
ASSERT_OK(mem2->Add(++seq, kTypeValue, "key3", "value3",
nullptr ));
mem2->ConstructFragmentedRangeTombstones();
list.Add(mem2, &to_delete);
ASSERT_EQ(0, to_delete.size());
to_flush.clear();
list.PickMemtablesToFlush(
std::numeric_limits<uint64_t>::max() , &to_flush);
ASSERT_EQ(1, to_flush.size());
s = Mock_InstallMemtableFlushResults(&list, to_flush, &to_delete);
ASSERT_OK(s);
ASSERT_EQ(0, list.NumNotFlushed());
ASSERT_EQ(2, list.NumFlushed());
ASSERT_EQ(0, to_delete.size());
WriteBufferManager wb3(options.db_write_buffer_size);
MemTable* mem3 = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb3,
kMaxSequenceNumber, 0 );
mem3->Ref();
mem3->ConstructFragmentedRangeTombstones();
list.Add(mem3, &to_delete);
ASSERT_EQ(1, list.NumNotFlushed());
ASSERT_EQ(1, list.NumFlushed());
ASSERT_EQ(1, to_delete.size());
merge_context.Clear();
s = Status::OK();
found =
list.current()->Get(LookupKey("key1", seq), &value, nullptr,
nullptr, &s, &merge_context,
&max_covering_tombstone_seq, ReadOptions());
ASSERT_FALSE(found);
merge_context.Clear();
s = Status::OK();
found =
list.current()->Get(LookupKey("key2", seq), &value, nullptr,
nullptr, &s, &merge_context,
&max_covering_tombstone_seq, ReadOptions());
ASSERT_FALSE(found);
merge_context.Clear();
s = Status::OK();
found =
list.current()->Get(LookupKey("key3", seq), &value, nullptr,
nullptr, &s, &merge_context,
&max_covering_tombstone_seq, ReadOptions());
ASSERT_FALSE(found);
merge_context.Clear();
s = Status::OK();
found = list.current()->GetFromHistory(
LookupKey("key1", seq), &value, nullptr,
nullptr, &s, &merge_context, &max_covering_tombstone_seq,
ReadOptions());
ASSERT_TRUE(found && s.IsNotFound());
merge_context.Clear();
s = Status::OK();
found = list.current()->GetFromHistory(
LookupKey("key3", seq), &value, nullptr,
nullptr, &s, &merge_context, &max_covering_tombstone_seq,
ReadOptions());
ASSERT_TRUE(found);
ASSERT_EQ("value3", value);
merge_context.Clear();
s = Status::OK();
found =
list.current()->Get(LookupKey("key2", seq), &value, nullptr,
nullptr, &s, &merge_context,
&max_covering_tombstone_seq, ReadOptions());
ASSERT_FALSE(found);
list.current()->Unref(&to_delete);
ASSERT_EQ(3, to_delete.size());
for (ReadOnlyMemTable* m : to_delete) {
delete m;
}
}
TEST_F(MemTableListTest, FlushPendingTest) {
const int num_tables = 6;
SequenceNumber seq = 1;
Status s;
auto factory = std::make_shared<SkipListFactory>();
options.memtable_factory = factory;
ImmutableOptions ioptions(options);
InternalKeyComparator cmp(BytewiseComparator());
WriteBufferManager wb(options.db_write_buffer_size);
autovector<ReadOnlyMemTable*> to_delete;
int min_write_buffer_number_to_merge = 3;
int64_t max_write_buffer_size_to_maintain =
7 * static_cast<int>(options.write_buffer_size);
MemTableList list(min_write_buffer_number_to_merge,
max_write_buffer_size_to_maintain);
uint64_t memtable_id = 0;
std::vector<MemTable*> tables;
MutableCFOptions mutable_cf_options(options);
for (int i = 0; i < num_tables; i++) {
MemTable* mem = new MemTable(cmp, ioptions, mutable_cf_options, &wb,
kMaxSequenceNumber, 0 );
mem->SetID(memtable_id++);
mem->Ref();
std::string value;
MergeContext merge_context;
ASSERT_OK(mem->Add(++seq, kTypeValue, "key1", std::to_string(i),
nullptr ));
ASSERT_OK(mem->Add(++seq, kTypeValue, "keyN" + std::to_string(i), "valueN",
nullptr ));
ASSERT_OK(mem->Add(++seq, kTypeValue, "keyX" + std::to_string(i), "value",
nullptr ));
ASSERT_OK(mem->Add(++seq, kTypeValue, "keyM" + std::to_string(i), "valueM",
nullptr ));
ASSERT_OK(mem->Add(++seq, kTypeDeletion, "keyX" + std::to_string(i), "",
nullptr ));
tables.push_back(mem);
}
ASSERT_FALSE(list.IsFlushPending());
ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
autovector<ReadOnlyMemTable*> to_flush;
list.PickMemtablesToFlush(
std::numeric_limits<uint64_t>::max() , &to_flush);
ASSERT_EQ(0, to_flush.size());
list.FlushRequested();
ASSERT_FALSE(list.IsFlushPending());
ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
list.PickMemtablesToFlush(
std::numeric_limits<uint64_t>::max() , &to_flush);
ASSERT_EQ(0, to_flush.size());
ASSERT_FALSE(list.IsFlushPending());
ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
list.FlushRequested();
ASSERT_FALSE(list.IsFlushPending());
ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
list.Add(tables[0], &to_delete);
list.Add(tables[1], &to_delete);
ASSERT_EQ(2, list.NumNotFlushed());
ASSERT_EQ(0, to_delete.size());
ASSERT_TRUE(list.IsFlushPending());
ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
list.PickMemtablesToFlush(
std::numeric_limits<uint64_t>::max() , &to_flush);
ASSERT_EQ(2, to_flush.size());
ASSERT_EQ(2, list.NumNotFlushed());
ASSERT_FALSE(list.IsFlushPending());
ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
list.RollbackMemtableFlush(to_flush, false);
ASSERT_FALSE(list.IsFlushPending());
ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
to_flush.clear();
list.Add(tables[2], &to_delete);
ASSERT_TRUE(list.IsFlushPending());
ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
ASSERT_EQ(0, to_delete.size());
list.PickMemtablesToFlush(
std::numeric_limits<uint64_t>::max() , &to_flush);
ASSERT_EQ(3, to_flush.size());
ASSERT_EQ(3, list.NumNotFlushed());
ASSERT_FALSE(list.IsFlushPending());
ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
autovector<ReadOnlyMemTable*> to_flush2;
list.PickMemtablesToFlush(
std::numeric_limits<uint64_t>::max() , &to_flush2);
ASSERT_EQ(0, to_flush2.size());
ASSERT_EQ(3, list.NumNotFlushed());
ASSERT_FALSE(list.IsFlushPending());
ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
list.Add(tables[3], &to_delete);
ASSERT_FALSE(list.IsFlushPending());
ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
ASSERT_EQ(0, to_delete.size());
list.FlushRequested();
ASSERT_TRUE(list.IsFlushPending());
ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
list.PickMemtablesToFlush(
std::numeric_limits<uint64_t>::max() , &to_flush2);
ASSERT_EQ(1, to_flush2.size());
ASSERT_EQ(4, list.NumNotFlushed());
ASSERT_FALSE(list.IsFlushPending());
ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
list.RollbackMemtableFlush(to_flush, false);
ASSERT_TRUE(list.IsFlushPending());
ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
to_flush.clear();
list.Add(tables[4], &to_delete);
ASSERT_EQ(5, list.NumNotFlushed());
ASSERT_TRUE(list.IsFlushPending());
ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
ASSERT_EQ(0, to_delete.size());
list.PickMemtablesToFlush(
std::numeric_limits<uint64_t>::max() , &to_flush);
ASSERT_EQ(3, to_flush.size());
ASSERT_EQ(5, list.NumNotFlushed());
ASSERT_FALSE(list.IsFlushPending());
ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
autovector<ReadOnlyMemTable*> to_flush3;
list.PickMemtablesToFlush(
std::numeric_limits<uint64_t>::max() , &to_flush3);
ASSERT_EQ(1, to_flush3.size());
ASSERT_EQ(5, list.NumNotFlushed());
ASSERT_FALSE(list.IsFlushPending());
ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
autovector<ReadOnlyMemTable*> to_flush4;
list.PickMemtablesToFlush(
std::numeric_limits<uint64_t>::max() , &to_flush4);
ASSERT_EQ(0, to_flush4.size());
ASSERT_EQ(5, list.NumNotFlushed());
ASSERT_FALSE(list.IsFlushPending());
ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
s = Mock_InstallMemtableFlushResults(&list, to_flush, &to_delete);
ASSERT_OK(s);
ASSERT_EQ(2, list.NumNotFlushed());
int num_in_history =
std::min(3, static_cast<int>(max_write_buffer_size_to_maintain) /
static_cast<int>(options.write_buffer_size));
ASSERT_EQ(num_in_history, list.NumFlushed());
ASSERT_EQ(5 - list.NumNotFlushed() - num_in_history, to_delete.size());
list.FlushRequested();
ASSERT_FALSE(list.IsFlushPending());
ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
s = MemTableListTest::Mock_InstallMemtableFlushResults(&list, to_flush3,
&to_delete);
ASSERT_OK(s);
ASSERT_EQ(2, list.NumNotFlushed());
ASSERT_EQ(0, to_delete.size());
s = MemTableListTest::Mock_InstallMemtableFlushResults(&list, to_flush2,
&to_delete);
ASSERT_OK(s);
ASSERT_EQ(0, list.NumNotFlushed());
num_in_history =
std::min(5, static_cast<int>(max_write_buffer_size_to_maintain) /
static_cast<int>(options.write_buffer_size));
ASSERT_EQ(num_in_history, list.NumFlushed());
ASSERT_EQ(5 - list.NumNotFlushed() - num_in_history, to_delete.size());
for (const auto& m : to_delete) {
m->Ref();
ASSERT_EQ(m, m->Unref());
delete m;
}
to_delete.clear();
list.Add(tables[5], &to_delete);
ASSERT_EQ(1, list.NumNotFlushed());
ASSERT_EQ(5, list.GetLatestMemTableID(false ));
memtable_id = 4;
autovector<ReadOnlyMemTable*> to_flush5;
list.FlushRequested();
ASSERT_TRUE(list.HasFlushRequested());
list.PickMemtablesToFlush(memtable_id, &to_flush5);
ASSERT_TRUE(to_flush5.empty());
ASSERT_EQ(1, list.NumNotFlushed());
ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
ASSERT_FALSE(list.IsFlushPending());
ASSERT_FALSE(list.HasFlushRequested());
memtable_id = 5;
list.FlushRequested();
list.PickMemtablesToFlush(memtable_id, &to_flush5);
ASSERT_EQ(1, static_cast<int>(to_flush5.size()));
ASSERT_EQ(1, list.NumNotFlushed());
ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
ASSERT_FALSE(list.IsFlushPending());
to_delete.clear();
list.current()->Unref(&to_delete);
int to_delete_size =
std::min(num_tables, static_cast<int>(max_write_buffer_size_to_maintain) /
static_cast<int>(options.write_buffer_size));
ASSERT_EQ(to_delete_size, to_delete.size());
for (const auto& m : to_delete) {
m->Ref();
ASSERT_EQ(m, m->Unref());
delete m;
}
to_delete.clear();
}
TEST_F(MemTableListTest, EmptyAtomicFlushTest) {
autovector<MemTableList*> lists;
autovector<uint32_t> cf_ids;
autovector<const autovector<ReadOnlyMemTable*>*> to_flush;
autovector<ReadOnlyMemTable*> to_delete;
Status s = Mock_InstallMemtableAtomicFlushResults(lists, cf_ids, to_flush,
&to_delete);
ASSERT_OK(s);
ASSERT_TRUE(to_delete.empty());
}
TEST_F(MemTableListTest, AtomicFlushTest) {
const int num_cfs = 3;
const int num_tables_per_cf = 2;
SequenceNumber seq = 1;
auto factory = std::make_shared<SkipListFactory>();
options.memtable_factory = factory;
ImmutableOptions ioptions(options);
InternalKeyComparator cmp(BytewiseComparator());
WriteBufferManager wb(options.db_write_buffer_size);
int min_write_buffer_number_to_merge = 3;
int64_t max_write_buffer_size_to_maintain =
7 * static_cast<int64_t>(options.write_buffer_size);
autovector<MemTableList*> lists;
for (int i = 0; i != num_cfs; ++i) {
lists.emplace_back(new MemTableList(min_write_buffer_number_to_merge,
max_write_buffer_size_to_maintain));
}
autovector<uint32_t> cf_ids;
std::vector<std::vector<MemTable*>> tables(num_cfs);
autovector<const MutableCFOptions*> mutable_cf_options_list;
uint32_t cf_id = 0;
for (auto& elem : tables) {
mutable_cf_options_list.emplace_back(new MutableCFOptions(options));
uint64_t memtable_id = 0;
for (int i = 0; i != num_tables_per_cf; ++i) {
MemTable* mem =
new MemTable(cmp, ioptions, *(mutable_cf_options_list.back()), &wb,
kMaxSequenceNumber, cf_id);
mem->SetID(memtable_id++);
mem->Ref();
std::string value;
ASSERT_OK(mem->Add(++seq, kTypeValue, "key1", std::to_string(i),
nullptr ));
ASSERT_OK(mem->Add(++seq, kTypeValue, "keyN" + std::to_string(i),
"valueN", nullptr ));
ASSERT_OK(mem->Add(++seq, kTypeValue, "keyX" + std::to_string(i), "value",
nullptr ));
ASSERT_OK(mem->Add(++seq, kTypeValue, "keyM" + std::to_string(i),
"valueM", nullptr ));
ASSERT_OK(mem->Add(++seq, kTypeDeletion, "keyX" + std::to_string(i), "",
nullptr ));
elem.push_back(mem);
}
cf_ids.push_back(cf_id++);
}
std::vector<autovector<ReadOnlyMemTable*>> flush_candidates(num_cfs);
for (auto i = 0; i != num_cfs; ++i) {
auto* list = lists[i];
ASSERT_FALSE(list->IsFlushPending());
ASSERT_FALSE(list->imm_flush_needed.load(std::memory_order_acquire));
list->PickMemtablesToFlush(
std::numeric_limits<uint64_t>::max() ,
&flush_candidates[i]);
ASSERT_EQ(0, flush_candidates[i].size());
}
for (auto i = 0; i != num_cfs; ++i) {
auto* list = lists[i];
list->FlushRequested();
ASSERT_FALSE(list->IsFlushPending());
ASSERT_FALSE(list->imm_flush_needed.load(std::memory_order_acquire));
}
autovector<ReadOnlyMemTable*> to_delete;
for (auto i = 0; i != num_cfs; ++i) {
for (auto j = 0; j != num_tables_per_cf; ++j) {
lists[i]->Add(tables[i][j], &to_delete);
}
ASSERT_EQ(num_tables_per_cf, lists[i]->NumNotFlushed());
ASSERT_TRUE(lists[i]->IsFlushPending());
ASSERT_TRUE(lists[i]->imm_flush_needed.load(std::memory_order_acquire));
}
std::vector<uint64_t> flush_memtable_ids = {1, 1, 0};
for (auto i = 0; i != num_cfs; ++i) {
flush_candidates[i].clear();
lists[i]->PickMemtablesToFlush(flush_memtable_ids[i], &flush_candidates[i]);
ASSERT_EQ(flush_memtable_ids[i] - 0 + 1,
static_cast<uint64_t>(flush_candidates[i].size()));
}
autovector<MemTableList*> tmp_lists;
autovector<uint32_t> tmp_cf_ids;
autovector<const autovector<ReadOnlyMemTable*>*> to_flush;
for (auto i = 0; i != num_cfs; ++i) {
if (!flush_candidates[i].empty()) {
to_flush.push_back(&flush_candidates[i]);
tmp_lists.push_back(lists[i]);
tmp_cf_ids.push_back(i);
}
}
Status s = Mock_InstallMemtableAtomicFlushResults(tmp_lists, tmp_cf_ids,
to_flush, &to_delete);
ASSERT_OK(s);
for (auto i = 0; i != num_cfs; ++i) {
for (auto j = 0; j != num_tables_per_cf; ++j) {
if (static_cast<uint64_t>(j) <= flush_memtable_ids[i]) {
ASSERT_LT(0, tables[i][j]->GetFileNumber());
}
}
ASSERT_EQ(
static_cast<size_t>(num_tables_per_cf) - flush_candidates[i].size(),
lists[i]->NumNotFlushed());
}
to_delete.clear();
for (auto list : lists) {
list->current()->Unref(&to_delete);
delete list;
}
for (auto& mutable_cf_options : mutable_cf_options_list) {
if (mutable_cf_options != nullptr) {
delete mutable_cf_options;
mutable_cf_options = nullptr;
}
}
ASSERT_EQ(to_delete.size(), tables.size() * tables.front().size());
for (const auto& m : to_delete) {
m->Ref();
ASSERT_EQ(m, m->Unref());
delete m;
}
}
class MemTableListWithTimestampTest : public MemTableListTest {
public:
MemTableListWithTimestampTest() : MemTableListTest() {}
void SetUp() override { udt_enabled_ = true; }
};
TEST_F(MemTableListWithTimestampTest, GetTableNewestUDT) {
const int num_tables = 3;
const int num_entries = 5;
SequenceNumber seq = 1;
auto factory = std::make_shared<SkipListFactory>();
options.memtable_factory = factory;
options.persist_user_defined_timestamps = false;
ImmutableOptions ioptions(options);
const Comparator* ucmp = test::BytewiseComparatorWithU64TsWrapper();
InternalKeyComparator cmp(ucmp);
WriteBufferManager wb(options.db_write_buffer_size);
int min_write_buffer_number_to_merge = 1;
int64_t max_write_buffer_size_to_maintain =
4 * static_cast<int>(options.write_buffer_size);
MemTableList list(min_write_buffer_number_to_merge,
max_write_buffer_size_to_maintain);
uint64_t memtable_id = 0;
std::vector<MemTable*> tables;
MutableCFOptions mutable_cf_options(options);
uint64_t current_ts = 0;
autovector<ReadOnlyMemTable*> to_delete;
std::vector<std::string> newest_udts;
std::string key;
std::string write_ts;
for (int i = 0; i < num_tables; i++) {
MemTable* mem = new MemTable(cmp, ioptions, mutable_cf_options, &wb,
kMaxSequenceNumber, 0 );
mem->SetID(memtable_id++);
mem->Ref();
std::string value;
MergeContext merge_context;
for (int j = 0; j < num_entries; j++) {
key = "key1";
write_ts.clear();
PutFixed64(&write_ts, current_ts);
key.append(write_ts);
ASSERT_OK(mem->Add(++seq, kTypeValue, key, std::to_string(i),
nullptr ));
current_ts++;
}
tables.push_back(mem);
list.Add(tables.back(), &to_delete);
newest_udts.push_back(write_ts);
}
ASSERT_EQ(num_tables, list.NumNotFlushed());
ASSERT_TRUE(list.IsFlushPending());
std::vector<Slice> tables_newest_udts = list.GetTablesNewestUDT(num_tables);
ASSERT_EQ(newest_udts.size(), tables_newest_udts.size());
for (size_t i = 0; i < tables_newest_udts.size(); i++) {
const Slice& table_newest_udt = tables_newest_udts[i];
const Slice expected_newest_udt = newest_udts[i];
ASSERT_EQ(expected_newest_udt, table_newest_udt);
}
list.current()->Unref(&to_delete);
for (ReadOnlyMemTable* m : to_delete) {
delete m;
}
to_delete.clear();
}
}
int main(int argc, char** argv) {
ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}