#include <cassert>
#include <iostream>
#include <memory>
#include "db/db_impl/db_impl.h"
#include "db/dbformat.h"
#include "db/write_batch_internal.h"
#include "port/stack_trace.h"
#include "rocksdb/cache.h"
#include "rocksdb/comparator.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/merge_operator.h"
#include "rocksdb/utilities/db_ttl.h"
#include "rocksdb/wide_columns.h"
#include "test_util/testharness.h"
#include "util/cast_util.h"
#include "util/coding.h"
#include "utilities/merge_operators.h"
namespace ROCKSDB_NAMESPACE {
bool use_compression;
class MergeTest : public testing::Test {};
size_t num_merge_operator_calls;
void resetNumMergeOperatorCalls() { num_merge_operator_calls = 0; }
size_t num_partial_merge_calls;
void resetNumPartialMergeCalls() { num_partial_merge_calls = 0; }
class CountMergeOperator : public AssociativeMergeOperator {
public:
CountMergeOperator() {
mergeOperator_ = MergeOperators::CreateUInt64AddOperator();
}
bool Merge(const Slice& key, const Slice* existing_value, const Slice& value,
std::string* new_value, Logger* logger) const override {
assert(new_value->empty());
++num_merge_operator_calls;
if (existing_value == nullptr) {
new_value->assign(value.data(), value.size());
return true;
}
return mergeOperator_->PartialMerge(key, *existing_value, value, new_value,
logger);
}
bool PartialMergeMulti(const Slice& key,
const std::deque<Slice>& operand_list,
std::string* new_value,
Logger* logger) const override {
assert(new_value->empty());
++num_partial_merge_calls;
return mergeOperator_->PartialMergeMulti(key, operand_list, new_value,
logger);
}
const char* Name() const override { return "UInt64AddOperator"; }
private:
std::shared_ptr<MergeOperator> mergeOperator_;
};
class EnvMergeTest : public EnvWrapper {
public:
EnvMergeTest() : EnvWrapper(Env::Default()) {}
static const char* kClassName() { return "MergeEnv"; }
const char* Name() const override { return kClassName(); }
uint64_t NowNanos() override {
++now_nanos_count_;
return target()->NowNanos();
}
static uint64_t now_nanos_count_;
static std::unique_ptr<EnvMergeTest> singleton_;
static EnvMergeTest* GetInstance() {
if (nullptr == singleton_) {
singleton_.reset(new EnvMergeTest);
}
return singleton_.get();
}
};
uint64_t EnvMergeTest::now_nanos_count_{0};
std::unique_ptr<EnvMergeTest> EnvMergeTest::singleton_;
std::unique_ptr<DB> OpenDb(const std::string& dbname, const bool ttl = false,
const size_t max_successive_merges = 0) {
std::unique_ptr<DB> db;
Options options;
options.create_if_missing = true;
options.merge_operator = std::make_shared<CountMergeOperator>();
options.max_successive_merges = max_successive_merges;
options.env = EnvMergeTest::GetInstance();
EXPECT_OK(DestroyDB(dbname, Options()));
Status s;
if (ttl) {
DBWithTTL* db_with_ttl;
s = DBWithTTL::Open(options, dbname, &db_with_ttl);
db.reset(db_with_ttl);
} else {
s = DB::Open(options, dbname, &db);
}
EXPECT_OK(s);
assert(s.ok());
EnvMergeTest::now_nanos_count_ = 0;
return db;
}
class Counters {
protected:
UnownedPtr<DB> db_;
WriteOptions put_option_;
ReadOptions get_option_;
WriteOptions delete_option_;
uint64_t default_;
public:
explicit Counters(UnownedPtr<DB> db, uint64_t defaultCount = 0)
: db_(db),
put_option_(),
get_option_(),
delete_option_(),
default_(defaultCount) {
assert(db_);
}
virtual ~Counters() = default;
bool set(const std::string& key, uint64_t value) {
char buf[sizeof(value)];
EncodeFixed64(buf, value);
Slice slice(buf, sizeof(value));
auto s = db_->Put(put_option_, key, slice);
if (s.ok()) {
return true;
} else {
std::cerr << s.ToString() << std::endl;
return false;
}
}
bool remove(const std::string& key) {
auto s = db_->Delete(delete_option_, key);
if (s.ok()) {
return true;
} else {
std::cerr << s.ToString() << std::endl;
return false;
}
}
bool get(const std::string& key, uint64_t* value) {
std::string str;
auto s = db_->Get(get_option_, key, &str);
if (s.IsNotFound()) {
*value = default_;
return true;
} else if (s.ok()) {
if (str.size() != sizeof(uint64_t)) {
std::cerr << "value corruption\n";
return false;
}
*value = DecodeFixed64(str.data());
return true;
} else {
std::cerr << s.ToString() << std::endl;
return false;
}
}
virtual bool add(const std::string& key, uint64_t value) {
uint64_t base = default_;
return get(key, &base) && set(key, base + value);
}
void assert_set(const std::string& key, uint64_t value) {
assert(set(key, value));
}
void assert_remove(const std::string& key) { assert(remove(key)); }
uint64_t assert_get(const std::string& key) {
uint64_t value = default_;
int result = get(key, &value);
assert(result);
if (result == 0) {
exit(1); }
return value;
}
void assert_add(const std::string& key, uint64_t value) {
int result = add(key, value);
assert(result);
if (result == 0) {
exit(1); }
}
};
class MergeBasedCounters : public Counters {
private:
WriteOptions merge_option_;
public:
explicit MergeBasedCounters(UnownedPtr<DB> db, uint64_t defaultCount = 0)
: Counters(db, defaultCount), merge_option_() {}
bool add(const std::string& key, uint64_t value) override {
char encoded[sizeof(uint64_t)];
EncodeFixed64(encoded, value);
Slice slice(encoded, sizeof(uint64_t));
auto s = db_->Merge(merge_option_, key, slice);
if (s.ok()) {
return true;
} else {
std::cerr << s.ToString() << std::endl;
return false;
}
}
};
void dumpDb(const std::unique_ptr<DB>& db) {
auto it = std::unique_ptr<Iterator>(db->NewIterator(ReadOptions()));
for (it->SeekToFirst(); it->Valid(); it->Next()) {
}
assert(it->status().ok()); }
void testCounters(Counters& counters, const std::unique_ptr<DB>& db,
bool test_compaction) {
FlushOptions o;
o.wait = true;
counters.assert_set("a", 1);
if (test_compaction) {
ASSERT_OK(db->Flush(o));
}
ASSERT_EQ(counters.assert_get("a"), 1);
counters.assert_remove("b");
ASSERT_EQ(counters.assert_get("b"), 0);
counters.assert_add("a", 2);
if (test_compaction) {
ASSERT_OK(db->Flush(o));
}
ASSERT_EQ(counters.assert_get("a"), 3);
dumpDb(db);
uint64_t sum = 0;
for (int i = 1; i < 50; i++) {
counters.assert_add("b", i);
sum += i;
}
ASSERT_EQ(counters.assert_get("b"), sum);
dumpDb(db);
if (test_compaction) {
ASSERT_OK(db->Flush(o));
ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
dumpDb(db);
ASSERT_EQ(counters.assert_get("a"), 3);
ASSERT_EQ(counters.assert_get("b"), sum);
}
}
void testCountersWithFlushAndCompaction(Counters& counters,
const std::unique_ptr<DB>& db) {
ASSERT_OK(db->Put({}, "1", "1"));
ASSERT_OK(db->Flush(FlushOptions()));
std::atomic<int> cnt{0};
const auto get_thread_id = [&cnt]() {
thread_local int thread_id{cnt++};
return thread_id;
};
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->SetCallBack(
"VersionSet::LogAndApply:BeforeWriterWaiting", [&](void* ) {
int thread_id = get_thread_id();
if (1 == thread_id) {
TEST_SYNC_POINT(
"testCountersWithFlushAndCompaction::bg_compact_thread:0");
} else if (2 == thread_id) {
TEST_SYNC_POINT(
"testCountersWithFlushAndCompaction::bg_flush_thread:0");
}
});
SyncPoint::GetInstance()->SetCallBack(
"VersionSet::LogAndApply:WriteManifest", [&](void* ) {
int thread_id = get_thread_id();
if (0 == thread_id) {
TEST_SYNC_POINT(
"testCountersWithFlushAndCompaction::set_options_thread:0");
TEST_SYNC_POINT(
"testCountersWithFlushAndCompaction::set_options_thread:1");
}
});
SyncPoint::GetInstance()->SetCallBack(
"VersionSet::LogAndApply:WakeUpAndDone", [&](void* arg) {
auto* mutex = static_cast<InstrumentedMutex*>(arg);
mutex->AssertHeld();
int thread_id = get_thread_id();
ASSERT_EQ(2, thread_id);
mutex->Unlock();
TEST_SYNC_POINT(
"testCountersWithFlushAndCompaction::bg_flush_thread:1");
TEST_SYNC_POINT(
"testCountersWithFlushAndCompaction::bg_flush_thread:2");
mutex->Lock();
});
SyncPoint::GetInstance()->LoadDependency({
{"testCountersWithFlushAndCompaction::set_options_thread:0",
"testCountersWithCompactionAndFlush:BeforeCompact"},
{"testCountersWithFlushAndCompaction::bg_compact_thread:0",
"testCountersWithFlushAndCompaction:BeforeIncCounters"},
{"testCountersWithFlushAndCompaction::bg_flush_thread:0",
"testCountersWithFlushAndCompaction::set_options_thread:1"},
{"testCountersWithFlushAndCompaction::bg_flush_thread:1",
"testCountersWithFlushAndCompaction:BeforeVerification"},
{"testCountersWithFlushAndCompaction:AfterGet",
"testCountersWithFlushAndCompaction::bg_flush_thread:2"},
});
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::SetOptions:dummy_edit", [&](void* arg) {
auto* dummy_edit = static_cast<VersionEdit*>(arg);
dummy_edit->Clear();
});
SyncPoint::GetInstance()->EnableProcessing();
port::Thread set_options_thread([&]() {
ASSERT_OK(static_cast_with_check<DBImpl>(db.get())->SetOptions(
{{"disable_auto_compactions", "false"}}));
});
TEST_SYNC_POINT("testCountersWithCompactionAndFlush:BeforeCompact");
port::Thread compact_thread([&]() {
ASSERT_OK(static_cast_with_check<DBImpl>(db.get())->CompactRange(
CompactRangeOptions(), db->DefaultColumnFamily(), nullptr, nullptr));
});
TEST_SYNC_POINT("testCountersWithFlushAndCompaction:BeforeIncCounters");
counters.add("test-key", 1);
FlushOptions flush_opts;
flush_opts.wait = false;
ASSERT_OK(db->Flush(flush_opts));
TEST_SYNC_POINT("testCountersWithFlushAndCompaction:BeforeVerification");
std::string expected;
PutFixed64(&expected, 1);
std::string actual;
Status s = db->Get(ReadOptions(), "test-key", &actual);
TEST_SYNC_POINT("testCountersWithFlushAndCompaction:AfterGet");
set_options_thread.join();
compact_thread.join();
ASSERT_OK(s);
ASSERT_EQ(expected, actual);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}
void testSuccessiveMerge(Counters& counters, size_t max_num_merges,
size_t num_merges) {
counters.assert_remove("z");
uint64_t sum = 0;
for (size_t i = 1; i <= num_merges; ++i) {
resetNumMergeOperatorCalls();
counters.assert_add("z", i);
sum += i;
if (i % (max_num_merges + 1) == 0) {
ASSERT_EQ(num_merge_operator_calls, max_num_merges + 1);
} else {
ASSERT_EQ(num_merge_operator_calls, 0);
}
resetNumMergeOperatorCalls();
ASSERT_EQ(counters.assert_get("z"), sum);
ASSERT_EQ(num_merge_operator_calls, i % (max_num_merges + 1));
}
}
void testPartialMerge(Counters* counters, const std::unique_ptr<DB>& db,
size_t max_merge, size_t min_merge, size_t count) {
FlushOptions o;
o.wait = true;
uint64_t tmp_sum = 0;
resetNumPartialMergeCalls();
for (size_t i = 1; i <= count; i++) {
counters->assert_add("b", i);
tmp_sum += i;
}
ASSERT_OK(db->Flush(o));
ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_EQ(tmp_sum, counters->assert_get("b"));
if (count > max_merge) {
ASSERT_EQ(num_partial_merge_calls, 0U);
} else {
ASSERT_EQ((count >= min_merge), (num_partial_merge_calls == 1));
}
resetNumPartialMergeCalls();
tmp_sum = 0;
ASSERT_OK(db->Put(ROCKSDB_NAMESPACE::WriteOptions(), "c", "10"));
for (size_t i = 1; i <= count; i++) {
counters->assert_add("c", i);
tmp_sum += i;
}
ASSERT_OK(db->Flush(o));
ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_EQ(tmp_sum, counters->assert_get("c"));
ASSERT_EQ(num_partial_merge_calls, 0U);
ASSERT_EQ(EnvMergeTest::now_nanos_count_, 0U);
}
void testSingleBatchSuccessiveMerge(const std::unique_ptr<DB>& db,
size_t max_num_merges, size_t num_merges) {
ASSERT_GT(num_merges, max_num_merges);
Slice key("BatchSuccessiveMerge");
uint64_t merge_value = 1;
char buf[sizeof(merge_value)];
EncodeFixed64(buf, merge_value);
Slice merge_value_slice(buf, sizeof(merge_value));
WriteBatch batch;
for (size_t i = 0; i < num_merges; ++i) {
ASSERT_OK(batch.Merge(key, merge_value_slice));
}
resetNumMergeOperatorCalls();
ASSERT_OK(db->Write(WriteOptions(), &batch));
ASSERT_EQ(
num_merge_operator_calls,
static_cast<size_t>(num_merges - (num_merges % (max_num_merges + 1))));
resetNumMergeOperatorCalls();
std::string get_value_str;
ASSERT_OK(db->Get(ReadOptions(), key, &get_value_str));
assert(get_value_str.size() == sizeof(uint64_t));
uint64_t get_value = DecodeFixed64(get_value_str.data());
ASSERT_EQ(get_value, num_merges * merge_value);
ASSERT_EQ(num_merge_operator_calls,
static_cast<size_t>((num_merges % (max_num_merges + 1))));
}
void runTest(const std::string& dbname, const bool use_ttl = false) {
{
auto db = OpenDb(dbname, use_ttl);
{
Counters counters(db.get(), 0);
testCounters(counters, db, true);
}
{
MergeBasedCounters counters(db.get(), 0);
testCounters(counters, db, use_compression);
}
}
ASSERT_OK(DestroyDB(dbname, Options()));
{
size_t max_merge = 5;
auto db = OpenDb(dbname, use_ttl, max_merge);
MergeBasedCounters counters(db.get(), 0);
testCounters(counters, db, use_compression);
testSuccessiveMerge(counters, max_merge, max_merge * 2);
testSingleBatchSuccessiveMerge(db, 5, 7);
ASSERT_OK(db->Close());
ASSERT_OK(DestroyDB(dbname, Options()));
}
{
size_t max_merge = 100;
uint32_t min_merge = 2;
for (uint32_t count = min_merge - 1; count <= min_merge + 1; count++) {
auto db = OpenDb(dbname, use_ttl, max_merge);
MergeBasedCounters counters(db.get(), 0);
testPartialMerge(&counters, db, max_merge, min_merge, count);
ASSERT_OK(db->Close());
ASSERT_OK(DestroyDB(dbname, Options()));
}
{
auto db = OpenDb(dbname, use_ttl, max_merge);
MergeBasedCounters counters(db.get(), 0);
testPartialMerge(&counters, db, max_merge, min_merge, min_merge * 10);
ASSERT_OK(db->Close());
ASSERT_OK(DestroyDB(dbname, Options()));
}
}
{
{
auto db = OpenDb(dbname);
MergeBasedCounters counters(db.get(), 0);
counters.add("test-key", 1);
counters.add("test-key", 1);
counters.add("test-key", 1);
ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
}
std::unique_ptr<DB> reopen_db;
ASSERT_OK(DB::Open(Options(), dbname, &reopen_db));
std::string value;
ASSERT_NOK(reopen_db->Get(ReadOptions(), "test-key", &value));
reopen_db.reset();
ASSERT_OK(DestroyDB(dbname, Options()));
}
}
TEST_F(MergeTest, MergeDbTest) {
runTest(test::PerThreadDBPath("merge_testdb"));
}
TEST_F(MergeTest, MergeDbTtlTest) {
runTest(test::PerThreadDBPath("merge_testdbttl"),
true); }
TEST_F(MergeTest, MergeWithCompactionAndFlush) {
const std::string dbname =
test::PerThreadDBPath("merge_with_compaction_and_flush");
{
auto db = OpenDb(dbname);
{
MergeBasedCounters counters(db.get(), 0);
testCountersWithFlushAndCompaction(counters, db);
}
}
ASSERT_OK(DestroyDB(dbname, Options()));
}
TEST_F(MergeTest, FullMergeV3FallbackNewValue) {
const Slice key("foo");
const MergeOperator::MergeOperationInputV3::OperandList operands{
"first", "second", "third"};
constexpr Logger* logger = nullptr;
auto append_operator =
MergeOperators::CreateStringAppendOperator(std::string());
{
MergeOperator::MergeOperationInputV3::ExistingValue existing_value;
const MergeOperator::MergeOperationInputV3 merge_in(
key, std::move(existing_value), operands, logger);
MergeOperator::MergeOperationOutputV3 merge_out;
ASSERT_TRUE(append_operator->FullMergeV3(merge_in, &merge_out));
const auto& result = std::get<std::string>(merge_out.new_value);
ASSERT_EQ(result, operands[0].ToString() + operands[1].ToString() +
operands[2].ToString());
}
{
const Slice plain("plain");
MergeOperator::MergeOperationInputV3::ExistingValue existing_value(plain);
const MergeOperator::MergeOperationInputV3 merge_in(
key, std::move(existing_value), operands, logger);
MergeOperator::MergeOperationOutputV3 merge_out;
ASSERT_TRUE(append_operator->FullMergeV3(merge_in, &merge_out));
const auto& result = std::get<std::string>(merge_out.new_value);
ASSERT_EQ(result, plain.ToString() + operands[0].ToString() +
operands[1].ToString() + operands[2].ToString());
}
{
const WideColumns entity{
{kDefaultWideColumnName, "default"}, {"one", "1"}, {"two", "2"}};
MergeOperator::MergeOperationInputV3::ExistingValue existing_value(entity);
const MergeOperator::MergeOperationInputV3 merge_in(
key, std::move(existing_value), operands, logger);
MergeOperator::MergeOperationOutputV3 merge_out;
ASSERT_TRUE(append_operator->FullMergeV3(merge_in, &merge_out));
const auto& result =
std::get<MergeOperator::MergeOperationOutputV3::NewColumns>(
merge_out.new_value);
ASSERT_EQ(result.size(), entity.size());
ASSERT_EQ(result[0].first, entity[0].name());
ASSERT_EQ(result[0].second,
entity[0].value().ToString() + operands[0].ToString() +
operands[1].ToString() + operands[2].ToString());
ASSERT_EQ(result[1].first, entity[1].name());
ASSERT_EQ(result[1].second, entity[1].value());
ASSERT_EQ(result[2].first, entity[2].name());
ASSERT_EQ(result[2].second, entity[2].value());
}
{
const WideColumns entity{{"one", "1"}, {"two", "2"}};
MergeOperator::MergeOperationInputV3::ExistingValue existing_value(entity);
const MergeOperator::MergeOperationInputV3 merge_in(
key, std::move(existing_value), operands, logger);
MergeOperator::MergeOperationOutputV3 merge_out;
ASSERT_TRUE(append_operator->FullMergeV3(merge_in, &merge_out));
const auto& result =
std::get<MergeOperator::MergeOperationOutputV3::NewColumns>(
merge_out.new_value);
ASSERT_EQ(result.size(), entity.size() + 1);
ASSERT_EQ(result[0].first, kDefaultWideColumnName);
ASSERT_EQ(result[0].second, operands[0].ToString() +
operands[1].ToString() +
operands[2].ToString());
ASSERT_EQ(result[1].first, entity[0].name());
ASSERT_EQ(result[1].second, entity[0].value());
ASSERT_EQ(result[2].first, entity[1].name());
ASSERT_EQ(result[2].second, entity[1].value());
}
}
TEST_F(MergeTest, FullMergeV3FallbackExistingOperand) {
const Slice key("foo");
const MergeOperator::MergeOperationInputV3::OperandList operands{
"first", "second", "third"};
constexpr Logger* logger = nullptr;
auto put_operator = MergeOperators::CreatePutOperator();
{
MergeOperator::MergeOperationInputV3::ExistingValue existing_value;
const MergeOperator::MergeOperationInputV3 merge_in(
key, std::move(existing_value), operands, logger);
MergeOperator::MergeOperationOutputV3 merge_out;
ASSERT_TRUE(put_operator->FullMergeV3(merge_in, &merge_out));
const auto& result = std::get<Slice>(merge_out.new_value);
ASSERT_EQ(result.data(), operands.back().data());
ASSERT_EQ(result.size(), operands.back().size());
}
{
const Slice plain("plain");
MergeOperator::MergeOperationInputV3::ExistingValue existing_value(plain);
const MergeOperator::MergeOperationInputV3 merge_in(
key, std::move(existing_value), operands, logger);
MergeOperator::MergeOperationOutputV3 merge_out;
ASSERT_TRUE(put_operator->FullMergeV3(merge_in, &merge_out));
const auto& result = std::get<Slice>(merge_out.new_value);
ASSERT_EQ(result.data(), operands.back().data());
ASSERT_EQ(result.size(), operands.back().size());
}
{
const WideColumns entity{
{kDefaultWideColumnName, "default"}, {"one", "1"}, {"two", "2"}};
MergeOperator::MergeOperationInputV3::ExistingValue existing_value(entity);
const MergeOperator::MergeOperationInputV3 merge_in(
key, std::move(existing_value), operands, logger);
MergeOperator::MergeOperationOutputV3 merge_out;
ASSERT_TRUE(put_operator->FullMergeV3(merge_in, &merge_out));
const auto& result =
std::get<MergeOperator::MergeOperationOutputV3::NewColumns>(
merge_out.new_value);
ASSERT_EQ(result.size(), entity.size());
ASSERT_EQ(result[0].first, entity[0].name());
ASSERT_EQ(result[0].second, operands.back());
ASSERT_EQ(result[1].first, entity[1].name());
ASSERT_EQ(result[1].second, entity[1].value());
ASSERT_EQ(result[2].first, entity[2].name());
ASSERT_EQ(result[2].second, entity[2].value());
}
{
const WideColumns entity{{"one", "1"}, {"two", "2"}};
MergeOperator::MergeOperationInputV3::ExistingValue existing_value(entity);
const MergeOperator::MergeOperationInputV3 merge_in(
key, std::move(existing_value), operands, logger);
MergeOperator::MergeOperationOutputV3 merge_out;
ASSERT_TRUE(put_operator->FullMergeV3(merge_in, &merge_out));
const auto& result =
std::get<MergeOperator::MergeOperationOutputV3::NewColumns>(
merge_out.new_value);
ASSERT_EQ(result.size(), entity.size() + 1);
ASSERT_EQ(result[0].first, kDefaultWideColumnName);
ASSERT_EQ(result[0].second, operands.back());
ASSERT_EQ(result[1].first, entity[0].name());
ASSERT_EQ(result[1].second, entity[0].value());
ASSERT_EQ(result[2].first, entity[1].name());
ASSERT_EQ(result[2].second, entity[1].value());
}
}
TEST_F(MergeTest, FullMergeV3FallbackFailure) {
const Slice key("foo");
const MergeOperator::MergeOperationInputV3::OperandList operands{
"first", "second", "third"};
constexpr Logger* logger = nullptr;
class FailMergeOperator : public MergeOperator {
public:
bool FullMergeV2(const MergeOperationInput& ,
MergeOperationOutput* merge_out) const override {
assert(merge_out);
merge_out->op_failure_scope = OpFailureScope::kMustMerge;
return false;
}
const char* Name() const override { return "FailMergeOperator"; }
};
FailMergeOperator fail_operator;
{
MergeOperator::MergeOperationInputV3::ExistingValue existing_value;
const MergeOperator::MergeOperationInputV3 merge_in(
key, std::move(existing_value), operands, logger);
MergeOperator::MergeOperationOutputV3 merge_out;
ASSERT_FALSE(fail_operator.FullMergeV3(merge_in, &merge_out));
ASSERT_EQ(merge_out.op_failure_scope,
MergeOperator::OpFailureScope::kMustMerge);
}
{
const Slice plain("plain");
MergeOperator::MergeOperationInputV3::ExistingValue existing_value(plain);
const MergeOperator::MergeOperationInputV3 merge_in(
key, std::move(existing_value), operands, logger);
MergeOperator::MergeOperationOutputV3 merge_out;
ASSERT_FALSE(fail_operator.FullMergeV3(merge_in, &merge_out));
ASSERT_EQ(merge_out.op_failure_scope,
MergeOperator::OpFailureScope::kMustMerge);
}
{
const WideColumns entity{
{kDefaultWideColumnName, "default"}, {"one", "1"}, {"two", "2"}};
MergeOperator::MergeOperationInputV3::ExistingValue existing_value(entity);
const MergeOperator::MergeOperationInputV3 merge_in(
key, std::move(existing_value), operands, logger);
MergeOperator::MergeOperationOutputV3 merge_out;
ASSERT_FALSE(fail_operator.FullMergeV3(merge_in, &merge_out));
ASSERT_EQ(merge_out.op_failure_scope,
MergeOperator::OpFailureScope::kMustMerge);
}
{
const WideColumns entity{{"one", "1"}, {"two", "2"}};
MergeOperator::MergeOperationInputV3::ExistingValue existing_value(entity);
const MergeOperator::MergeOperationInputV3 merge_in(
key, std::move(existing_value), operands, logger);
MergeOperator::MergeOperationOutputV3 merge_out;
ASSERT_FALSE(fail_operator.FullMergeV3(merge_in, &merge_out));
ASSERT_EQ(merge_out.op_failure_scope,
MergeOperator::OpFailureScope::kMustMerge);
}
}
}
int main(int argc, char** argv) {
ROCKSDB_NAMESPACE::use_compression = false;
if (argc > 1) {
ROCKSDB_NAMESPACE::use_compression = true;
}
ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}