#include "db/compaction/compaction_iterator.h"
#include <string>
#include <vector>
#include "db/dbformat.h"
#include "port/port.h"
#include "test_util/testharness.h"
#include "test_util/testutil.h"
#include "util/string_util.h"
#include "util/vector_iterator.h"
#include "utilities/merge_operators.h"
namespace ROCKSDB_NAMESPACE {
namespace {
std::string ValueWithPreferredSeqno(std::string val,
SequenceNumber preferred_seqno = 0) {
std::string result = val;
PutFixed64(&result, preferred_seqno);
return result;
}
}
class NoMergingMergeOp : public MergeOperator {
public:
bool FullMergeV2(const MergeOperationInput& ,
MergeOperationOutput* ) const override {
ADD_FAILURE();
return false;
}
bool PartialMergeMulti(const Slice& ,
const std::deque<Slice>& ,
std::string* ,
Logger* ) const override {
ADD_FAILURE();
return false;
}
const char* Name() const override {
return "CompactionIteratorTest NoMergingMergeOp";
}
};
class StallingFilter : public CompactionFilter {
public:
Decision FilterV2(int , const Slice& key, ValueType ,
const Slice& , std::string* ,
std::string* ) const override {
int k = std::atoi(key.ToString().c_str());
last_seen.store(k);
while (k >= stall_at.load()) {
std::this_thread::yield();
}
return Decision::kRemove;
}
const char* Name() const override {
return "CompactionIteratorTest StallingFilter";
}
void WaitForStall(int k, bool exact = true) {
stall_at.store(k);
while (last_seen.load() < k) {
std::this_thread::yield();
}
if (exact) {
EXPECT_EQ(k, last_seen.load());
}
}
mutable std::atomic<int> stall_at{0};
mutable std::atomic<int> last_seen{0};
};
class FilterAllKeysCompactionFilter : public CompactionFilter {
public:
Decision FilterV2(int , const Slice& , ValueType ,
const Slice& , std::string* ,
std::string* ) const override {
return Decision::kRemove;
}
const char* Name() const override { return "AllKeysCompactionFilter"; }
};
class LoggingForwardVectorIterator : public VectorIterator {
public:
struct Action {
enum class Type {
SEEK_TO_FIRST,
SEEK,
NEXT,
};
Type type;
std::string arg;
explicit Action(Type _type, std::string _arg = "")
: type(_type), arg(_arg) {}
bool operator==(const Action& rhs) const {
return std::tie(type, arg) == std::tie(rhs.type, rhs.arg);
}
};
LoggingForwardVectorIterator(const std::vector<std::string>& keys,
const std::vector<std::string>& values)
: VectorIterator(keys, values) {
current_ = keys_.size();
}
void SeekToFirst() override {
log.emplace_back(Action::Type::SEEK_TO_FIRST);
VectorIterator::SeekToFirst();
}
void SeekToLast() override { assert(false); }
void Seek(const Slice& target) override {
log.emplace_back(Action::Type::SEEK, target.ToString());
VectorIterator::Seek(target);
}
void SeekForPrev(const Slice& ) override { assert(false); }
void Next() override {
assert(Valid());
log.emplace_back(Action::Type::NEXT);
VectorIterator::Next();
}
void Prev() override { assert(false); }
Slice key() const override {
assert(Valid());
return VectorIterator::key();
}
Slice value() const override {
assert(Valid());
return VectorIterator::value();
}
std::vector<Action> log;
};
class FakeCompaction : public CompactionIterator::CompactionProxy {
public:
int level() const override { return 0; }
bool KeyNotExistsBeyondOutputLevel(
const Slice& ,
std::vector<size_t>* ) const override {
return is_bottommost_level || key_not_exists_beyond_output_level;
}
bool bottommost_level() const override { return is_bottommost_level; }
int number_levels() const override { return 1; }
Slice GetLargestUserKey() const override {
return "\xff\xff\xff\xff\xff\xff\xff\xff\xff";
}
bool allow_ingest_behind() const override { return is_allow_ingest_behind; }
bool allow_mmap_reads() const override { return false; }
bool enable_blob_garbage_collection() const override { return false; }
double blob_garbage_collection_age_cutoff() const override { return 0.0; }
uint64_t blob_compaction_readahead_size() const override { return 0; }
const Version* input_version() const override { return nullptr; }
bool DoesInputReferenceBlobFiles() const override { return false; }
const Compaction* real_compaction() const override { return nullptr; }
bool SupportsPerKeyPlacement() const override {
return supports_per_key_placement;
}
bool key_not_exists_beyond_output_level = false;
bool is_bottommost_level = false;
bool is_allow_ingest_behind = false;
bool supports_per_key_placement = false;
};
class TestSnapshotChecker : public SnapshotChecker {
public:
explicit TestSnapshotChecker(
SequenceNumber last_committed_sequence,
const std::unordered_map<SequenceNumber, SequenceNumber>& snapshots =
{{}})
: last_committed_sequence_(last_committed_sequence),
snapshots_(snapshots) {}
SnapshotCheckerResult CheckInSnapshot(
SequenceNumber seq, SequenceNumber snapshot_seq) const override {
if (snapshot_seq == kMaxSequenceNumber) {
return seq <= last_committed_sequence_
? SnapshotCheckerResult::kInSnapshot
: SnapshotCheckerResult::kNotInSnapshot;
}
assert(snapshots_.count(snapshot_seq) > 0);
return seq <= snapshots_.at(snapshot_seq)
? SnapshotCheckerResult::kInSnapshot
: SnapshotCheckerResult::kNotInSnapshot;
}
private:
SequenceNumber last_committed_sequence_;
std::unordered_map<SequenceNumber, SequenceNumber> snapshots_;
};
class CompactionIteratorTest : public testing::TestWithParam<bool> {
public:
CompactionIteratorTest()
: cmp_(BytewiseComparator()), icmp_(cmp_), snapshots_({}) {}
explicit CompactionIteratorTest(const Comparator* ucmp)
: cmp_(ucmp), icmp_(cmp_), snapshots_({}) {}
void InitIterators(
const std::vector<std::string>& ks, const std::vector<std::string>& vs,
const std::vector<std::string>& range_del_ks,
const std::vector<std::string>& range_del_vs,
SequenceNumber last_sequence,
SequenceNumber last_committed_sequence = kMaxSequenceNumber,
MergeOperator* merge_op = nullptr, CompactionFilter* filter = nullptr,
bool bottommost_level = false,
SequenceNumber earliest_write_conflict_snapshot = kMaxSequenceNumber,
bool key_not_exists_beyond_output_level = false,
const std::string* full_history_ts_low = nullptr) {
std::unique_ptr<InternalIterator> unfragmented_range_del_iter(
new VectorIterator(range_del_ks, range_del_vs, &icmp_));
auto tombstone_list = std::make_shared<FragmentedRangeTombstoneList>(
std::move(unfragmented_range_del_iter), icmp_);
std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
new FragmentedRangeTombstoneIterator(tombstone_list, icmp_,
kMaxSequenceNumber));
range_del_agg_.reset(new CompactionRangeDelAggregator(&icmp_, snapshots_));
range_del_agg_->AddTombstones(std::move(range_del_iter));
std::unique_ptr<CompactionIterator::CompactionProxy> compaction;
if (filter || bottommost_level || key_not_exists_beyond_output_level) {
compaction_proxy_ = new FakeCompaction();
compaction_proxy_->is_bottommost_level = bottommost_level;
compaction_proxy_->is_allow_ingest_behind = AllowIngestBehind();
compaction_proxy_->key_not_exists_beyond_output_level =
key_not_exists_beyond_output_level;
compaction_proxy_->supports_per_key_placement = SupportsPerKeyPlacement();
compaction.reset(compaction_proxy_);
}
bool use_snapshot_checker = UseSnapshotChecker() || GetParam();
if (use_snapshot_checker || last_committed_sequence < kMaxSequenceNumber) {
snapshot_checker_.reset(
new TestSnapshotChecker(last_committed_sequence, snapshot_map_));
}
merge_helper_.reset(
new MergeHelper(Env::Default(), cmp_, merge_op, filter, nullptr, false,
0 , snapshot_checker_.get(),
0 , nullptr , &shutting_down_));
if (c_iter_) {
c_iter_.reset();
}
iter_.reset(new LoggingForwardVectorIterator(ks, vs));
iter_->SeekToFirst();
c_iter_.reset(new CompactionIterator(
iter_.get(), cmp_, merge_helper_.get(), last_sequence, &snapshots_,
snapshots_.empty() ? kMaxSequenceNumber : snapshots_.at(0),
earliest_write_conflict_snapshot, kMaxSequenceNumber,
snapshot_checker_.get(), Env::Default(),
false , range_del_agg_.get(),
nullptr , true ,
true ,
kManualCompactionCanceledFalse_,
std::move(compaction), false, filter,
&shutting_down_, nullptr, full_history_ts_low));
}
void AddSnapshot(SequenceNumber snapshot,
SequenceNumber last_visible_seq = kMaxSequenceNumber) {
snapshots_.push_back(snapshot);
snapshot_map_[snapshot] = last_visible_seq;
}
virtual bool UseSnapshotChecker() const { return false; }
virtual bool AllowIngestBehind() const { return false; }
virtual bool SupportsPerKeyPlacement() const { return false; }
void RunTest(
const std::vector<std::string>& input_keys,
const std::vector<std::string>& input_values,
const std::vector<std::string>& expected_keys,
const std::vector<std::string>& expected_values,
SequenceNumber last_committed_seq = kMaxSequenceNumber,
MergeOperator* merge_operator = nullptr,
CompactionFilter* compaction_filter = nullptr,
bool bottommost_level = false,
SequenceNumber earliest_write_conflict_snapshot = kMaxSequenceNumber,
bool key_not_exists_beyond_output_level = false,
const std::string* full_history_ts_low = nullptr) {
InitIterators(input_keys, input_values, {}, {}, kMaxSequenceNumber,
last_committed_seq, merge_operator, compaction_filter,
bottommost_level, earliest_write_conflict_snapshot,
key_not_exists_beyond_output_level, full_history_ts_low);
c_iter_->SeekToFirst();
for (size_t i = 0; i < expected_keys.size(); i++) {
std::string info = "i = " + std::to_string(i);
ASSERT_TRUE(c_iter_->Valid()) << info;
ASSERT_OK(c_iter_->status()) << info;
ASSERT_EQ(expected_keys[i], c_iter_->key().ToString()) << info;
ASSERT_EQ(expected_values[i], c_iter_->value().ToString()) << info;
c_iter_->Next();
}
ASSERT_OK(c_iter_->status());
ASSERT_FALSE(c_iter_->Valid());
}
void ClearSnapshots() {
snapshots_.clear();
snapshot_map_.clear();
}
const Comparator* cmp_;
const InternalKeyComparator icmp_;
std::vector<SequenceNumber> snapshots_;
std::unordered_map<SequenceNumber, SequenceNumber> snapshot_map_;
std::unique_ptr<MergeHelper> merge_helper_;
std::unique_ptr<LoggingForwardVectorIterator> iter_;
std::unique_ptr<CompactionIterator> c_iter_;
std::unique_ptr<CompactionRangeDelAggregator> range_del_agg_;
std::unique_ptr<SnapshotChecker> snapshot_checker_;
std::atomic<bool> shutting_down_{false};
const std::atomic<bool> kManualCompactionCanceledFalse_{false};
FakeCompaction* compaction_proxy_;
};
TEST_P(CompactionIteratorTest, EmptyResult) {
InitIterators({test::KeyStr("a", 5, kTypeSingleDeletion),
test::KeyStr("a", 3, kTypeValue)},
{"", "val"}, {}, {}, 5);
c_iter_->SeekToFirst();
ASSERT_OK(c_iter_->status());
ASSERT_FALSE(c_iter_->Valid());
}
TEST_P(CompactionIteratorTest, CorruptionAfterSingleDeletion) {
InitIterators({test::KeyStr("a", 5, kTypeSingleDeletion),
test::KeyStr("a", 3, kTypeValue, true),
test::KeyStr("b", 10, kTypeValue)},
{"", "val", "val2"}, {}, {}, 10);
c_iter_->SeekToFirst();
ASSERT_TRUE(c_iter_->Valid());
ASSERT_EQ(test::KeyStr("a", 5, kTypeSingleDeletion),
c_iter_->key().ToString());
c_iter_->Next();
ASSERT_FALSE(c_iter_->Valid());
ASSERT_FALSE(c_iter_->status().ok());
ASSERT_TRUE(c_iter_->status().IsCorruption());
}
TEST_P(CompactionIteratorTest, TimedPutAndSingleDelete) {
InitIterators({test::KeyStr("a", 5, kTypeSingleDeletion),
test::KeyStr("a", 3, kTypeValuePreferredSeqno)},
{"", "val"}, {}, {}, 5);
c_iter_->SeekToFirst();
ASSERT_OK(c_iter_->status());
ASSERT_FALSE(c_iter_->Valid());
}
TEST_P(CompactionIteratorTest, SimpleRangeDeletion) {
InitIterators({test::KeyStr("morning", 5, kTypeValue),
test::KeyStr("morning", 2, kTypeValue),
test::KeyStr("night", 3, kTypeValue)},
{"zao", "zao", "wan"},
{test::KeyStr("ma", 4, kTypeRangeDeletion)}, {"mz"}, 5);
c_iter_->SeekToFirst();
ASSERT_TRUE(c_iter_->Valid());
ASSERT_EQ(test::KeyStr("morning", 5, kTypeValue), c_iter_->key().ToString());
c_iter_->Next();
ASSERT_TRUE(c_iter_->Valid());
ASSERT_EQ(test::KeyStr("night", 3, kTypeValue), c_iter_->key().ToString());
c_iter_->Next();
ASSERT_OK(c_iter_->status());
ASSERT_FALSE(c_iter_->Valid());
}
TEST_P(CompactionIteratorTest, RangeDeletionWithSnapshots) {
AddSnapshot(10);
std::vector<std::string> ks1;
ks1.push_back(test::KeyStr("ma", 28, kTypeRangeDeletion));
std::vector<std::string> vs1{"mz"};
std::vector<std::string> ks2{test::KeyStr("morning", 15, kTypeValue),
test::KeyStr("morning", 5, kTypeValue),
test::KeyStr("night", 40, kTypeValue),
test::KeyStr("night", 20, kTypeValue)};
std::vector<std::string> vs2{"zao 15", "zao 5", "wan 40", "wan 20"};
InitIterators(ks2, vs2, ks1, vs1, 40);
c_iter_->SeekToFirst();
ASSERT_TRUE(c_iter_->Valid());
ASSERT_EQ(test::KeyStr("morning", 5, kTypeValue), c_iter_->key().ToString());
c_iter_->Next();
ASSERT_TRUE(c_iter_->Valid());
ASSERT_EQ(test::KeyStr("night", 40, kTypeValue), c_iter_->key().ToString());
c_iter_->Next();
ASSERT_OK(c_iter_->status());
ASSERT_FALSE(c_iter_->Valid());
}
TEST_P(CompactionIteratorTest, TimedPutAndRangeDeletion) {
InitIterators(
{test::KeyStr("morning", 5, kTypeValuePreferredSeqno),
test::KeyStr("morning", 2, kTypeValuePreferredSeqno),
test::KeyStr("night", 3, kTypeValuePreferredSeqno)},
{ValueWithPreferredSeqno("zao5"), ValueWithPreferredSeqno("zao2"),
ValueWithPreferredSeqno("wan")},
{test::KeyStr("ma", 4, kTypeRangeDeletion)}, {"mz"}, 5);
c_iter_->SeekToFirst();
ASSERT_TRUE(c_iter_->Valid());
ASSERT_EQ(test::KeyStr("morning", 5, kTypeValuePreferredSeqno),
c_iter_->key().ToString());
ASSERT_EQ(ValueWithPreferredSeqno("zao5"), c_iter_->value().ToString());
c_iter_->Next();
ASSERT_TRUE(c_iter_->Valid());
ASSERT_EQ(test::KeyStr("night", 3, kTypeValuePreferredSeqno),
c_iter_->key().ToString());
ASSERT_EQ(ValueWithPreferredSeqno("wan"), c_iter_->value().ToString());
c_iter_->Next();
ASSERT_OK(c_iter_->status());
ASSERT_FALSE(c_iter_->Valid());
}
TEST_P(CompactionIteratorTest, CompactionFilterSkipUntil) {
class Filter : public CompactionFilter {
Decision FilterV2(int , const Slice& key, ValueType t,
const Slice& existing_value, std::string* ,
std::string* skip_until) const override {
std::string k = key.ToString();
std::string v = existing_value.ToString();
if (k == "a") {
EXPECT_EQ(ValueType::kValue, t);
EXPECT_EQ("av50", v);
return Decision::kKeep;
}
if (k == "b") {
EXPECT_EQ(ValueType::kValue, t);
EXPECT_EQ("bv60", v);
*skip_until = "d+";
return Decision::kRemoveAndSkipUntil;
}
if (k == "e") {
EXPECT_EQ(ValueType::kMergeOperand, t);
EXPECT_EQ("em71", v);
return Decision::kKeep;
}
if (k == "f") {
if (v == "fm65") {
EXPECT_EQ(ValueType::kMergeOperand, t);
*skip_until = "f";
} else {
EXPECT_EQ("fm30", v);
EXPECT_EQ(ValueType::kMergeOperand, t);
*skip_until = "g+";
}
return Decision::kRemoveAndSkipUntil;
}
if (k == "h") {
EXPECT_EQ(ValueType::kValue, t);
EXPECT_EQ("hv91", v);
return Decision::kKeep;
}
if (k == "i") {
EXPECT_EQ(ValueType::kMergeOperand, t);
EXPECT_EQ("im95", v);
*skip_until = "z";
return Decision::kRemoveAndSkipUntil;
}
ADD_FAILURE();
return Decision::kKeep;
}
const char* Name() const override {
return "CompactionIteratorTest.CompactionFilterSkipUntil::Filter";
}
};
NoMergingMergeOp merge_op;
Filter filter;
InitIterators(
{test::KeyStr("a", 50, kTypeValue), test::KeyStr("a", 45, kTypeMerge),
test::KeyStr("b", 60, kTypeValue), test::KeyStr("b", 40, kTypeValue), test::KeyStr("c", 35, kTypeValue),
test::KeyStr("d", 70, kTypeMerge),
test::KeyStr("e", 71, kTypeMerge), test::KeyStr("f", 65, kTypeMerge), test::KeyStr("f", 30, kTypeMerge), test::KeyStr("f", 25, kTypeValue), test::KeyStr("g", 90, kTypeValue),
test::KeyStr("h", 91, kTypeValue), test::KeyStr("i", 95, kTypeMerge), test::KeyStr("j", 99, kTypeValue),
test::KeyStr("k", 100, kTypeValuePreferredSeqno)},
{"av50", "am45", "bv60", "bv40", "cv35", "dm70", "em71", "fm65", "fm30",
"fv25", "gv90", "hv91", "im95", "jv99",
ValueWithPreferredSeqno("kv100")},
{}, {}, kMaxSequenceNumber, kMaxSequenceNumber, &merge_op, &filter);
c_iter_->SeekToFirst();
ASSERT_TRUE(c_iter_->Valid());
ASSERT_EQ(test::KeyStr("a", 50, kTypeValue), c_iter_->key().ToString());
ASSERT_EQ("av50", c_iter_->value().ToString());
c_iter_->Next();
ASSERT_TRUE(c_iter_->Valid());
ASSERT_EQ(test::KeyStr("e", 71, kTypeMerge), c_iter_->key().ToString());
ASSERT_EQ("em71", c_iter_->value().ToString());
c_iter_->Next();
ASSERT_TRUE(c_iter_->Valid());
ASSERT_EQ(test::KeyStr("h", 91, kTypeValue), c_iter_->key().ToString());
ASSERT_EQ("hv91", c_iter_->value().ToString());
c_iter_->Next();
ASSERT_OK(c_iter_->status());
ASSERT_FALSE(c_iter_->Valid());
using A = LoggingForwardVectorIterator::Action;
using T = A::Type;
std::vector<A> expected_actions = {
A(T::SEEK_TO_FIRST),
A(T::NEXT),
A(T::NEXT),
A(T::SEEK, test::KeyStr("d+", kMaxSequenceNumber, kValueTypeForSeek)),
A(T::NEXT),
A(T::NEXT),
A(T::SEEK, test::KeyStr("g+", kMaxSequenceNumber, kValueTypeForSeek)),
A(T::NEXT),
A(T::SEEK, test::KeyStr("z", kMaxSequenceNumber, kValueTypeForSeek))};
ASSERT_EQ(expected_actions, iter_->log);
}
TEST_P(CompactionIteratorTest, ShuttingDownInFilter) {
NoMergingMergeOp merge_op;
StallingFilter filter;
InitIterators(
{test::KeyStr("1", 1, kTypeValue), test::KeyStr("2", 2, kTypeValue),
test::KeyStr("3", 3, kTypeValue), test::KeyStr("4", 4, kTypeValue)},
{"v1", "v2", "v3", "v4"}, {}, {}, kMaxSequenceNumber, kMaxSequenceNumber,
&merge_op, &filter);
compaction_proxy_->key_not_exists_beyond_output_level = true;
std::atomic<bool> seek_done{false};
ROCKSDB_NAMESPACE::port::Thread compaction_thread([&] {
c_iter_->SeekToFirst();
EXPECT_FALSE(c_iter_->Valid());
EXPECT_TRUE(c_iter_->status().IsShutdownInProgress());
seek_done.store(true);
});
filter.WaitForStall(1);
filter.WaitForStall(2);
shutting_down_.store(true);
EXPECT_FALSE(seek_done.load());
filter.stall_at.store(3);
compaction_thread.join();
assert(seek_done.load());
EXPECT_EQ(2, filter.last_seen.load());
}
TEST_P(CompactionIteratorTest, ShuttingDownInMerge) {
NoMergingMergeOp merge_op;
StallingFilter filter;
InitIterators(
{test::KeyStr("1", 1, kTypeValue), test::KeyStr("2", 2, kTypeMerge),
test::KeyStr("3", 3, kTypeMerge), test::KeyStr("4", 4, kTypeValue)},
{"v1", "v2", "v3", "v4"}, {}, {}, kMaxSequenceNumber, kMaxSequenceNumber,
&merge_op, &filter);
compaction_proxy_->key_not_exists_beyond_output_level = true;
std::atomic<bool> seek_done{false};
ROCKSDB_NAMESPACE::port::Thread compaction_thread([&] {
c_iter_->SeekToFirst();
ASSERT_FALSE(c_iter_->Valid());
ASSERT_TRUE(c_iter_->status().IsShutdownInProgress());
seek_done.store(true);
});
filter.WaitForStall(1);
filter.WaitForStall(2);
shutting_down_.store(true);
EXPECT_FALSE(seek_done.load());
filter.stall_at.store(3);
compaction_thread.join();
assert(seek_done.load());
EXPECT_EQ(2, filter.last_seen.load());
}
class Filter : public CompactionFilter {
Decision FilterV2(int , const Slice& key, ValueType t,
const Slice& existing_value, std::string* ,
std::string* ) const override {
std::string k = key.ToString();
std::string v = existing_value.ToString();
if (k == "a") {
EXPECT_EQ(ValueType::kMergeOperand, t);
EXPECT_EQ("av1", v);
return Decision::kKeep;
} else if (k == "b") {
EXPECT_EQ(ValueType::kMergeOperand, t);
return Decision::kKeep;
} else if (k == "c") {
return Decision::kKeep;
}
ADD_FAILURE();
return Decision::kKeep;
}
const char* Name() const override {
return "CompactionIteratorTest.SingleMergeOperand::Filter";
}
};
class SingleMergeOp : public MergeOperator {
public:
bool FullMergeV2(const MergeOperationInput& merge_in,
MergeOperationOutput* merge_out) const override {
EXPECT_EQ("c", merge_in.key.ToString());
std::string temp_value;
if (merge_in.existing_value != nullptr) {
temp_value = merge_in.existing_value->ToString();
}
for (auto& operand : merge_in.operand_list) {
temp_value.append(operand.ToString());
}
merge_out->new_value = temp_value;
return true;
}
bool PartialMergeMulti(const Slice& key,
const std::deque<Slice>& operand_list,
std::string* new_value,
Logger* ) const override {
std::string string_key = key.ToString();
EXPECT_TRUE(string_key == "a" || string_key == "b");
if (string_key == "a") {
EXPECT_EQ(1, operand_list.size());
} else if (string_key == "b") {
EXPECT_EQ(2, operand_list.size());
}
std::string temp_value;
for (auto& operand : operand_list) {
temp_value.append(operand.ToString());
}
swap(temp_value, *new_value);
return true;
}
const char* Name() const override {
return "CompactionIteratorTest SingleMergeOp";
}
bool AllowSingleOperand() const override { return true; }
};
TEST_P(CompactionIteratorTest, SingleMergeOperand) {
SingleMergeOp merge_op;
Filter filter;
InitIterators(
{test::KeyStr("a", 50, kTypeMerge),
test::KeyStr("b", 70, kTypeMerge), test::KeyStr("b", 60, kTypeMerge),
test::KeyStr("c", 90, kTypeMerge), test::KeyStr("c", 80, kTypeValue)},
{"av1", "bv2", "bv1", "cv2", "cv1"}, {}, {}, kMaxSequenceNumber,
kMaxSequenceNumber, &merge_op, &filter);
c_iter_->SeekToFirst();
ASSERT_TRUE(c_iter_->Valid());
ASSERT_EQ(test::KeyStr("a", 50, kTypeMerge), c_iter_->key().ToString());
ASSERT_EQ("av1", c_iter_->value().ToString());
c_iter_->Next();
ASSERT_TRUE(c_iter_->Valid());
ASSERT_EQ("bv1bv2", c_iter_->value().ToString());
c_iter_->Next();
ASSERT_OK(c_iter_->status());
ASSERT_EQ("cv1cv2", c_iter_->value().ToString());
}
TEST_P(CompactionIteratorTest, TimedPutAndMerge) {
SingleMergeOp merge_op;
Filter filter;
InitIterators({test::KeyStr("c", 90, kTypeMerge),
test::KeyStr("c", 80, kTypeValuePreferredSeqno)},
{"cv2", ValueWithPreferredSeqno("cv1")}, {}, {},
kMaxSequenceNumber, kMaxSequenceNumber, &merge_op, &filter);
c_iter_->SeekToFirst();
ASSERT_TRUE(c_iter_->Valid());
ASSERT_EQ(test::KeyStr("c", 90, kTypeValue), c_iter_->key().ToString());
ASSERT_OK(c_iter_->status());
ASSERT_EQ("cv1cv2", c_iter_->value().ToString());
}
TEST_P(CompactionIteratorTest, ZeroOutSequenceAtBottomLevel) {
AddSnapshot(1);
RunTest({test::KeyStr("a", 1, kTypeValue), test::KeyStr("b", 2, kTypeValue)},
{"v1", "v2"},
{test::KeyStr("a", 0, kTypeValue), test::KeyStr("b", 2, kTypeValue)},
{"v1", "v2"}, kMaxSequenceNumber ,
nullptr , nullptr ,
true );
}
TEST_P(CompactionIteratorTest, RemoveDeletionAtBottomLevel) {
AddSnapshot(1);
RunTest(
{test::KeyStr("a", 1, kTypeDeletion), test::KeyStr("b", 3, kTypeDeletion),
test::KeyStr("b", 1, kTypeValue)},
{"", "", ""},
{test::KeyStr("b", 3, kTypeDeletion), test::KeyStr("b", 0, kTypeValue)},
{"", ""}, kMaxSequenceNumber ,
nullptr , nullptr ,
true );
}
TEST_P(CompactionIteratorTest, RemoveSingleDeletionAtBottomLevel) {
AddSnapshot(1);
RunTest({test::KeyStr("a", 1, kTypeSingleDeletion),
test::KeyStr("b", 2, kTypeSingleDeletion)},
{"", ""}, {test::KeyStr("b", 2, kTypeSingleDeletion)}, {""},
kMaxSequenceNumber , nullptr ,
nullptr , true );
}
TEST_P(CompactionIteratorTest, ConvertToPutAtBottom) {
std::shared_ptr<MergeOperator> merge_op =
MergeOperators::CreateStringAppendOperator();
RunTest({test::KeyStr("a", 4, kTypeMerge), test::KeyStr("a", 3, kTypeMerge),
test::KeyStr("a", 2, kTypeMerge), test::KeyStr("b", 1, kTypeValue)},
{"a4", "a3", "a2", "b1"},
{test::KeyStr("a", 0, kTypeValue), test::KeyStr("b", 0, kTypeValue)},
{"a2,a3,a4", "b1"}, kMaxSequenceNumber ,
merge_op.get(), nullptr ,
true );
}
TEST_P(CompactionIteratorTest, ZeroSeqOfKeyAndSnapshot) {
AddSnapshot(0);
const std::vector<std::string> input_keys = {
test::KeyStr("a", 0, kTypeValue), test::KeyStr("b", 0, kTypeValue)};
const std::vector<std::string> input_values = {"a1", "b1"};
RunTest(input_keys, input_values, input_keys, input_values);
}
INSTANTIATE_TEST_CASE_P(CompactionIteratorTestInstance, CompactionIteratorTest,
testing::Values(true, false));
class CompactionIteratorWithSnapshotCheckerTest
: public CompactionIteratorTest {
public:
bool UseSnapshotChecker() const override { return true; }
};
TEST_F(CompactionIteratorWithSnapshotCheckerTest,
PreserveUncommittedKeys_Value) {
RunTest(
{test::KeyStr("foo", 3, kTypeValue), test::KeyStr("foo", 2, kTypeValue),
test::KeyStr("foo", 1, kTypeValue)},
{"v3", "v2", "v1"},
{test::KeyStr("foo", 3, kTypeValue), test::KeyStr("foo", 2, kTypeValue)},
{"v3", "v2"}, 2 );
}
TEST_F(CompactionIteratorWithSnapshotCheckerTest,
PreserveUncommittedKeys_Deletion) {
RunTest({test::KeyStr("foo", 2, kTypeDeletion),
test::KeyStr("foo", 1, kTypeValue)},
{"", "v1"},
{test::KeyStr("foo", 2, kTypeDeletion),
test::KeyStr("foo", 1, kTypeValue)},
{"", "v1"}, 1 );
}
TEST_F(CompactionIteratorWithSnapshotCheckerTest,
PreserveUncommittedKeys_Merge) {
auto merge_op = MergeOperators::CreateStringAppendOperator();
RunTest(
{test::KeyStr("foo", 3, kTypeMerge), test::KeyStr("foo", 2, kTypeMerge),
test::KeyStr("foo", 1, kTypeValue)},
{"v3", "v2", "v1"},
{test::KeyStr("foo", 3, kTypeMerge), test::KeyStr("foo", 2, kTypeValue)},
{"v3", "v1,v2"}, 2 , merge_op.get());
}
TEST_F(CompactionIteratorWithSnapshotCheckerTest,
PreserveUncommittedKeys_SingleDelete) {
RunTest({test::KeyStr("foo", 2, kTypeSingleDeletion),
test::KeyStr("foo", 1, kTypeValue)},
{"", "v1"},
{test::KeyStr("foo", 2, kTypeSingleDeletion),
test::KeyStr("foo", 1, kTypeValue)},
{"", "v1"}, 1 );
}
TEST_F(CompactionIteratorWithSnapshotCheckerTest,
PreserveUncommittedKeys_BlobIndex) {
RunTest({test::KeyStr("foo", 3, kTypeBlobIndex),
test::KeyStr("foo", 2, kTypeBlobIndex),
test::KeyStr("foo", 1, kTypeBlobIndex)},
{"v3", "v2", "v1"},
{test::KeyStr("foo", 3, kTypeBlobIndex),
test::KeyStr("foo", 2, kTypeBlobIndex)},
{"v3", "v2"}, 2 );
}
TEST_F(CompactionIteratorWithSnapshotCheckerTest, DedupSameSnapshot_Value) {
AddSnapshot(2, 1);
RunTest(
{test::KeyStr("foo", 4, kTypeValue), test::KeyStr("foo", 3, kTypeValue),
test::KeyStr("foo", 2, kTypeValue), test::KeyStr("foo", 1, kTypeValue)},
{"v4", "v3", "v2", "v1"},
{test::KeyStr("foo", 4, kTypeValue), test::KeyStr("foo", 3, kTypeValue),
test::KeyStr("foo", 1, kTypeValue)},
{"v4", "v3", "v1"}, 3 );
}
TEST_F(CompactionIteratorWithSnapshotCheckerTest, DedupSameSnapshot_TimedPut) {
AddSnapshot(2, 1);
RunTest({test::KeyStr("foo", 4, kTypeValuePreferredSeqno),
test::KeyStr("foo", 3, kTypeValuePreferredSeqno),
test::KeyStr("foo", 2, kTypeValuePreferredSeqno),
test::KeyStr("foo", 1, kTypeValuePreferredSeqno)},
{ValueWithPreferredSeqno("v4"), ValueWithPreferredSeqno("v3"),
ValueWithPreferredSeqno("v2"), ValueWithPreferredSeqno("v1")},
{test::KeyStr("foo", 4, kTypeValuePreferredSeqno),
test::KeyStr("foo", 3, kTypeValuePreferredSeqno),
test::KeyStr("foo", 1, kTypeValuePreferredSeqno)},
{ValueWithPreferredSeqno("v4"), ValueWithPreferredSeqno("v3"),
ValueWithPreferredSeqno("v1")},
3 );
}
TEST_F(CompactionIteratorWithSnapshotCheckerTest, DedupSameSnapshot_Deletion) {
AddSnapshot(2, 1);
RunTest(
{test::KeyStr("foo", 4, kTypeValue),
test::KeyStr("foo", 3, kTypeDeletion),
test::KeyStr("foo", 2, kTypeValue), test::KeyStr("foo", 1, kTypeValue)},
{"v4", "", "v2", "v1"},
{test::KeyStr("foo", 4, kTypeValue),
test::KeyStr("foo", 3, kTypeDeletion),
test::KeyStr("foo", 1, kTypeValue)},
{"v4", "", "v1"}, 3 );
}
TEST_F(CompactionIteratorWithSnapshotCheckerTest, DedupSameSnapshot_Merge) {
AddSnapshot(2, 1);
AddSnapshot(4, 3);
auto merge_op = MergeOperators::CreateStringAppendOperator();
RunTest(
{test::KeyStr("foo", 5, kTypeMerge), test::KeyStr("foo", 4, kTypeMerge),
test::KeyStr("foo", 3, kTypeMerge), test::KeyStr("foo", 2, kTypeMerge),
test::KeyStr("foo", 1, kTypeValue)},
{"v5", "v4", "v3", "v2", "v1"},
{test::KeyStr("foo", 5, kTypeMerge), test::KeyStr("foo", 4, kTypeMerge),
test::KeyStr("foo", 3, kTypeMerge), test::KeyStr("foo", 1, kTypeValue)},
{"v5", "v4", "v2,v3", "v1"}, 4 , merge_op.get());
}
TEST_F(CompactionIteratorWithSnapshotCheckerTest,
DedupSameSnapshot_SingleDeletion) {
AddSnapshot(2, 1);
RunTest(
{test::KeyStr("foo", 4, kTypeValue),
test::KeyStr("foo", 3, kTypeSingleDeletion),
test::KeyStr("foo", 2, kTypeValue), test::KeyStr("foo", 1, kTypeValue)},
{"v4", "", "v2", "v1"},
{test::KeyStr("foo", 4, kTypeValue), test::KeyStr("foo", 1, kTypeValue)},
{"v4", "v1"}, 3 );
}
TEST_F(CompactionIteratorWithSnapshotCheckerTest, DedupSameSnapshot_BlobIndex) {
AddSnapshot(2, 1);
RunTest({test::KeyStr("foo", 4, kTypeBlobIndex),
test::KeyStr("foo", 3, kTypeBlobIndex),
test::KeyStr("foo", 2, kTypeBlobIndex),
test::KeyStr("foo", 1, kTypeBlobIndex)},
{"v4", "v3", "v2", "v1"},
{test::KeyStr("foo", 4, kTypeBlobIndex),
test::KeyStr("foo", 3, kTypeBlobIndex),
test::KeyStr("foo", 1, kTypeBlobIndex)},
{"v4", "v3", "v1"}, 3 );
}
TEST_F(CompactionIteratorWithSnapshotCheckerTest,
NotZeroOutSequenceIfNotVisibleToEarliestSnapshot) {
AddSnapshot(2, 1);
RunTest({test::KeyStr("a", 1, kTypeValue), test::KeyStr("b", 2, kTypeValue),
test::KeyStr("c", 3, kTypeValue)},
{"v1", "v2", "v3"},
{test::KeyStr("a", 0, kTypeValue), test::KeyStr("b", 2, kTypeValue),
test::KeyStr("c", 3, kTypeValue)},
{"v1", "v2", "v3"}, kMaxSequenceNumber ,
nullptr , nullptr ,
true );
}
TEST_F(CompactionIteratorWithSnapshotCheckerTest,
NotRemoveDeletionIfNotVisibleToEarliestSnapshot) {
AddSnapshot(2, 1);
RunTest(
{test::KeyStr("a", 1, kTypeDeletion), test::KeyStr("b", 2, kTypeDeletion),
test::KeyStr("c", 3, kTypeDeletion)},
{"", "", ""}, {}, {"", ""}, kMaxSequenceNumber ,
nullptr , nullptr ,
true );
}
TEST_F(CompactionIteratorWithSnapshotCheckerTest,
NotRemoveDeletionIfValuePresentToEarlierSnapshot) {
AddSnapshot(2, 1);
RunTest({test::KeyStr("a", 4, kTypeDeletion),
test::KeyStr("a", 1, kTypeValue), test::KeyStr("b", 3, kTypeValue)},
{"", "", ""},
{test::KeyStr("a", 4, kTypeDeletion),
test::KeyStr("a", 0, kTypeValue), test::KeyStr("b", 3, kTypeValue)},
{"", "", ""}, kMaxSequenceNumber ,
nullptr , nullptr ,
true );
}
TEST_F(CompactionIteratorWithSnapshotCheckerTest,
NotRemoveSingleDeletionIfNotVisibleToEarliestSnapshot) {
AddSnapshot(2, 1);
RunTest({test::KeyStr("a", 1, kTypeSingleDeletion),
test::KeyStr("b", 2, kTypeSingleDeletion),
test::KeyStr("c", 3, kTypeSingleDeletion)},
{"", "", ""},
{test::KeyStr("b", 2, kTypeSingleDeletion),
test::KeyStr("c", 3, kTypeSingleDeletion)},
{"", ""}, kMaxSequenceNumber ,
nullptr , nullptr ,
true );
}
TEST_F(CompactionIteratorWithSnapshotCheckerTest,
SingleDeleteAcrossSnapshotBoundary) {
AddSnapshot(2, 1);
RunTest({test::KeyStr("a", 2, kTypeSingleDeletion),
test::KeyStr("a", 1, kTypeValue)},
{"", "v1"},
{test::KeyStr("a", 2, kTypeSingleDeletion),
test::KeyStr("a", 1, kTypeValue)},
{"", "v1"}, 2 );
}
TEST_F(CompactionIteratorWithSnapshotCheckerTest,
KeepSingleDeletionForWriteConflictChecking) {
AddSnapshot(2, 0);
RunTest({test::KeyStr("a", 2, kTypeSingleDeletion),
test::KeyStr("a", 1, kTypeValue)},
{"", "v1"},
{test::KeyStr("a", 2, kTypeSingleDeletion),
test::KeyStr("a", 1, kTypeValue)},
{"", ""}, 2 , nullptr ,
nullptr , false ,
2 );
}
TEST_F(CompactionIteratorWithSnapshotCheckerTest,
KeepSingleDeletionForWriteConflictChecking_BlobIndex) {
AddSnapshot(2, 0);
RunTest({test::KeyStr("a", 2, kTypeSingleDeletion),
test::KeyStr("a", 1, kTypeBlobIndex)},
{"", "fake_blob_index"},
{test::KeyStr("a", 2, kTypeSingleDeletion),
test::KeyStr("a", 1, kTypeValue)},
{"", ""}, 2 , nullptr ,
nullptr , false ,
2 );
}
TEST_F(CompactionIteratorWithSnapshotCheckerTest,
KeepSingleDeletionForWriteConflictChecking_WideColumnEntity) {
AddSnapshot(2, 0);
RunTest({test::KeyStr("a", 2, kTypeSingleDeletion),
test::KeyStr("a", 1, kTypeWideColumnEntity)},
{"", "fake_entity"},
{test::KeyStr("a", 2, kTypeSingleDeletion),
test::KeyStr("a", 1, kTypeValue)},
{"", ""}, 2 , nullptr ,
nullptr , false ,
2 );
}
TEST_F(CompactionIteratorWithSnapshotCheckerTest,
KeepSingleDeletionForWriteConflictChecking_TimedPut) {
AddSnapshot(2, 0);
RunTest({test::KeyStr("a", 2, kTypeSingleDeletion),
test::KeyStr("a", 1, kTypeValuePreferredSeqno)},
{"", ValueWithPreferredSeqno("v1")},
{test::KeyStr("a", 2, kTypeSingleDeletion),
test::KeyStr("a", 1, kTypeValue)},
{"", ""}, 2 , nullptr ,
nullptr , false ,
2 );
}
TEST_F(CompactionIteratorWithSnapshotCheckerTest,
TimedPut_NotVisibleToEarliestSnapshot_NoSwapPreferredSeqno) {
AddSnapshot(3);
RunTest({test::KeyStr("bar", 5, kTypeValuePreferredSeqno)},
{ValueWithPreferredSeqno("bv2", 2)},
{test::KeyStr("bar", 5, kTypeValuePreferredSeqno)},
{ValueWithPreferredSeqno("bv2", 2), "bv1"}, 5 ,
nullptr , nullptr ,
true ,
kMaxSequenceNumber ,
true );
}
TEST_F(CompactionIteratorWithSnapshotCheckerTest,
TimedPut_MoreEntriesInLowerLevels_NoSwapPreferredSeqno) {
RunTest({test::KeyStr("bar", 5, kTypeValuePreferredSeqno)},
{ValueWithPreferredSeqno("bv2", 2)},
{test::KeyStr("bar", 5, kTypeValuePreferredSeqno)},
{ValueWithPreferredSeqno("bv2", 2)}, 5 ,
nullptr , nullptr ,
false ,
kMaxSequenceNumber ,
false );
}
TEST_F(CompactionIteratorWithSnapshotCheckerTest,
TimedPut_ShouldBeCoverredByRangeDeletionBeforeSwap_NoOutput) {
InitIterators({test::KeyStr("morning", 5, kTypeValuePreferredSeqno),
test::KeyStr("morning", 2, kTypeValuePreferredSeqno),
test::KeyStr("night", 6, kTypeValue)},
{ValueWithPreferredSeqno("zao", 3),
ValueWithPreferredSeqno("zao", 1), "wan"},
{test::KeyStr("ma", 6, kTypeRangeDeletion)}, {"mz"}, 6,
kMaxSequenceNumber ,
nullptr , nullptr ,
false ,
kMaxSequenceNumber ,
true );
c_iter_->SeekToFirst();
ASSERT_TRUE(c_iter_->Valid());
ASSERT_EQ(test::KeyStr("night", 6, kTypeValue), c_iter_->key().ToString());
ASSERT_EQ("wan", c_iter_->value().ToString());
c_iter_->Next();
ASSERT_FALSE(c_iter_->Valid());
ASSERT_OK(c_iter_->status());
}
TEST_F(CompactionIteratorWithSnapshotCheckerTest,
TimedPut_WillBeHiddenByRangeDeletionAfterSwap_NoSwap) {
InitIterators({test::KeyStr("morning", 5, kTypeValuePreferredSeqno),
test::KeyStr("night", 6, kTypeValue)},
{ValueWithPreferredSeqno("zao", 3), "wan"},
{test::KeyStr("ma", 4, kTypeRangeDeletion)}, {"mz"}, 6,
kMaxSequenceNumber ,
nullptr , nullptr ,
false ,
kMaxSequenceNumber ,
true );
c_iter_->SeekToFirst();
ASSERT_TRUE(c_iter_->Valid());
ASSERT_EQ(test::KeyStr("morning", 5, kTypeValuePreferredSeqno),
c_iter_->key().ToString());
ASSERT_EQ(ValueWithPreferredSeqno("zao", 3), c_iter_->value().ToString());
c_iter_->Next();
ASSERT_TRUE(c_iter_->Valid());
ASSERT_EQ(test::KeyStr("night", 6, kTypeValue), c_iter_->key().ToString());
ASSERT_EQ("wan", c_iter_->value().ToString());
c_iter_->Next();
ASSERT_FALSE(c_iter_->Valid());
ASSERT_OK(c_iter_->status());
}
TEST_F(CompactionIteratorWithSnapshotCheckerTest,
TimedPut_BottomMostLevelVisibleToEarliestSnapshot_SwapPreferredSeqno) {
RunTest(
{test::KeyStr("bar", 5, kTypeValuePreferredSeqno),
test::KeyStr("bar", 4, kTypeValuePreferredSeqno),
test::KeyStr("foo", 6, kTypeValue)},
{ValueWithPreferredSeqno("bv2", 2), ValueWithPreferredSeqno("bv1", 1),
"fv1"},
{test::KeyStr("bar", 0, kTypeValue), test::KeyStr("foo", 0, kTypeValue)},
{"bv2", "fv1"}, 6 , nullptr ,
nullptr , true );
}
TEST_F(
CompactionIteratorWithSnapshotCheckerTest,
TimedPut_NonBottomMostLevelVisibleToEarliestSnapshot_SwapPreferredSeqno) {
RunTest(
{test::KeyStr("bar", 5, kTypeValuePreferredSeqno),
test::KeyStr("bar", 4, kTypeValuePreferredSeqno),
test::KeyStr("foo", 6, kTypeValue)},
{ValueWithPreferredSeqno("bv2", 2), ValueWithPreferredSeqno("bv1", 1),
"fv1"},
{test::KeyStr("bar", 2, kTypeValue), test::KeyStr("foo", 6, kTypeValue)},
{"bv2", "fv1"}, 6 , nullptr ,
nullptr , false ,
kMaxSequenceNumber ,
true );
}
TEST_F(CompactionIteratorWithSnapshotCheckerTest,
TimedPut_SequenceNumberAlreadyZeroedOut_ChangeType) {
RunTest(
{test::KeyStr("bar", 0, kTypeValuePreferredSeqno),
test::KeyStr("bar", 0, kTypeValuePreferredSeqno),
test::KeyStr("foo", 0, kTypeValue)},
{ValueWithPreferredSeqno("bv2", 2), ValueWithPreferredSeqno("bv1", 1),
"fv1"},
{test::KeyStr("bar", 0, kTypeValue), test::KeyStr("foo", 0, kTypeValue)},
{"bv2", "fv1"}, 6 , nullptr ,
nullptr , true );
}
TEST_F(CompactionIteratorWithSnapshotCheckerTest, CompactionFilter_Value) {
std::unique_ptr<CompactionFilter> compaction_filter(
new FilterAllKeysCompactionFilter());
RunTest(
{test::KeyStr("a", 2, kTypeValue), test::KeyStr("a", 1, kTypeValue),
test::KeyStr("b", 3, kTypeValue), test::KeyStr("c", 1, kTypeValue)},
{"v2", "v1", "v3", "v4"},
{test::KeyStr("a", 2, kTypeValue), test::KeyStr("a", 1, kTypeDeletion),
test::KeyStr("b", 3, kTypeValue), test::KeyStr("c", 1, kTypeDeletion)},
{"v2", "", "v3", ""}, 1 ,
nullptr , compaction_filter.get());
}
TEST_F(CompactionIteratorWithSnapshotCheckerTest, CompactionFilter_TimedPut) {
std::unique_ptr<CompactionFilter> compaction_filter(
new FilterAllKeysCompactionFilter());
RunTest({test::KeyStr("a", 2, kTypeValuePreferredSeqno)},
{ValueWithPreferredSeqno("v1")},
{test::KeyStr("a", 2, kTypeValuePreferredSeqno)},
{ValueWithPreferredSeqno("v1")}, 2 ,
nullptr , compaction_filter.get());
}
TEST_F(CompactionIteratorWithSnapshotCheckerTest, CompactionFilter_Deletion) {
std::unique_ptr<CompactionFilter> compaction_filter(
new FilterAllKeysCompactionFilter());
RunTest(
{test::KeyStr("a", 2, kTypeDeletion), test::KeyStr("a", 1, kTypeValue)},
{"", "v1"},
{test::KeyStr("a", 2, kTypeDeletion),
test::KeyStr("a", 1, kTypeDeletion)},
{"", ""}, 1 , nullptr ,
compaction_filter.get());
}
TEST_F(CompactionIteratorWithSnapshotCheckerTest,
CompactionFilter_PartialMerge) {
std::shared_ptr<MergeOperator> merge_op =
MergeOperators::CreateStringAppendOperator();
std::unique_ptr<CompactionFilter> compaction_filter(
new FilterAllKeysCompactionFilter());
RunTest({test::KeyStr("a", 3, kTypeMerge), test::KeyStr("a", 2, kTypeMerge),
test::KeyStr("a", 1, kTypeMerge)},
{"v3", "v2", "v1"}, {test::KeyStr("a", 3, kTypeMerge)}, {"v3"},
2 , merge_op.get(), compaction_filter.get());
}
TEST_F(CompactionIteratorWithSnapshotCheckerTest, CompactionFilter_FullMerge) {
std::shared_ptr<MergeOperator> merge_op =
MergeOperators::CreateStringAppendOperator();
std::unique_ptr<CompactionFilter> compaction_filter(
new FilterAllKeysCompactionFilter());
RunTest(
{test::KeyStr("a", 3, kTypeMerge), test::KeyStr("a", 2, kTypeMerge),
test::KeyStr("a", 1, kTypeValue)},
{"v3", "v2", "v1"},
{test::KeyStr("a", 3, kTypeMerge), test::KeyStr("a", 1, kTypeDeletion)},
{"v3", ""}, 2 , merge_op.get(),
compaction_filter.get());
}
class CompactionIteratorWithAllowIngestBehindTest
: public CompactionIteratorTest {
public:
bool AllowIngestBehind() const override { return true; }
};
TEST_P(CompactionIteratorWithAllowIngestBehindTest, NoConvertToPutAtBottom) {
std::shared_ptr<MergeOperator> merge_op =
MergeOperators::CreateStringAppendOperator();
RunTest({test::KeyStr("a", 4, kTypeMerge), test::KeyStr("a", 3, kTypeMerge),
test::KeyStr("a", 2, kTypeMerge), test::KeyStr("b", 1, kTypeValue)},
{"a4", "a3", "a2", "b1"},
{test::KeyStr("a", 4, kTypeMerge), test::KeyStr("b", 1, kTypeValue)},
{"a2,a3,a4", "b1"}, kMaxSequenceNumber ,
merge_op.get(), nullptr ,
true );
}
TEST_P(CompactionIteratorWithAllowIngestBehindTest,
MergeToPutIfEncounteredPutAtBottom) {
std::shared_ptr<MergeOperator> merge_op =
MergeOperators::CreateStringAppendOperator();
RunTest({test::KeyStr("a", 4, kTypeMerge), test::KeyStr("a", 3, kTypeMerge),
test::KeyStr("a", 2, kTypeValue), test::KeyStr("b", 1, kTypeValue)},
{"a4", "a3", "a2", "b1"},
{test::KeyStr("a", 4, kTypeValue), test::KeyStr("b", 1, kTypeValue)},
{"a2,a3,a4", "b1"}, kMaxSequenceNumber ,
merge_op.get(), nullptr ,
true );
}
INSTANTIATE_TEST_CASE_P(CompactionIteratorWithAllowIngestBehindTestInstance,
CompactionIteratorWithAllowIngestBehindTest,
testing::Values(true, false));
class CompactionIteratorTsGcTest : public CompactionIteratorTest {
public:
CompactionIteratorTsGcTest()
: CompactionIteratorTest(test::BytewiseComparatorWithU64TsWrapper()) {}
};
TEST_P(CompactionIteratorTsGcTest, NoKeyEligibleForGC) {
constexpr char user_key[][2] = {{'a', '\0'}, {'b', '\0'}};
const std::vector<std::string> input_keys = {
test::KeyStr(103, user_key[0], 4, kTypeValue),
test::KeyStr(102, user_key[0], 3,
kTypeDeletionWithTimestamp),
test::KeyStr(104, user_key[1], 5, kTypeValue)};
const std::vector<std::string> input_values = {"a3", "", "b2"};
std::string full_history_ts_low;
PutFixed64(&full_history_ts_low, 102);
const std::vector<std::string>& expected_keys = input_keys;
const std::vector<std::string>& expected_values = input_values;
const std::vector<std::pair<bool, bool>> params = {
{false, false}, {false, true}, {true, true}};
for (const std::pair<bool, bool>& param : params) {
const bool bottommost_level = param.first;
const bool key_not_exists_beyond_output_level = param.second;
RunTest(input_keys, input_values, expected_keys, expected_values,
kMaxSequenceNumber,
nullptr, nullptr,
bottommost_level,
kMaxSequenceNumber,
key_not_exists_beyond_output_level, &full_history_ts_low);
}
}
TEST_P(CompactionIteratorTsGcTest, NoMergeEligibleForGc) {
constexpr char user_key[] = "a";
const std::vector<std::string> input_keys = {
test::KeyStr(10002, user_key, 102, kTypeMerge),
test::KeyStr(10001, user_key, 101, kTypeMerge),
test::KeyStr(10000, user_key, 100, kTypeValue)};
const std::vector<std::string> input_values = {"2", "1", "a0"};
std::shared_ptr<MergeOperator> merge_op =
MergeOperators::CreateStringAppendTESTOperator();
const std::vector<std::string>& expected_keys = input_keys;
const std::vector<std::string>& expected_values = input_values;
const std::vector<std::pair<bool, bool>> params = {
{false, false}, {false, true}, {true, true}};
for (const auto& param : params) {
const bool bottommost_level = param.first;
const bool key_not_exists_beyond_output_level = param.second;
RunTest(input_keys, input_values, expected_keys, expected_values,
kMaxSequenceNumber, merge_op.get(),
nullptr, bottommost_level,
kMaxSequenceNumber,
key_not_exists_beyond_output_level,
nullptr);
}
}
TEST_P(CompactionIteratorTsGcTest, AllKeysOlderThanThreshold) {
constexpr char user_key[][2] = {{'a', '\0'}, {'b', '\0'}};
const std::vector<std::string> input_keys = {
test::KeyStr(103, user_key[0], 4,
kTypeDeletionWithTimestamp),
test::KeyStr(102, user_key[0], 3, kTypeValue),
test::KeyStr(101, user_key[0], 2, kTypeValue),
test::KeyStr(104, user_key[1], 5, kTypeValue)};
const std::vector<std::string> input_values = {"", "a2", "a1", "b5"};
std::string full_history_ts_low;
PutFixed64(&full_history_ts_low, std::numeric_limits<uint64_t>::max());
{
AddSnapshot(3);
const std::vector<std::string> expected_keys = {
input_keys[0], input_keys[1], input_keys[3]};
const std::vector<std::string> expected_values = {"", "a2", "b5"};
RunTest(input_keys, input_values, expected_keys, expected_values,
kMaxSequenceNumber,
nullptr, nullptr,
false,
kMaxSequenceNumber,
false, &full_history_ts_low);
ClearSnapshots();
}
{
const std::vector<std::string> expected_keys = {input_keys[0],
input_keys[3]};
const std::vector<std::string> expected_values = {"", "b5"};
RunTest(input_keys, input_values, expected_keys, expected_values,
kMaxSequenceNumber,
nullptr, nullptr,
false,
kMaxSequenceNumber,
false, &full_history_ts_low);
}
{
const std::vector<std::string> expected_keys = {input_keys[3]};
const std::vector<std::string> expected_values = {"b5"};
RunTest(input_keys, input_values, expected_keys, expected_values,
kMaxSequenceNumber,
nullptr, nullptr,
false,
kMaxSequenceNumber,
true, &full_history_ts_low);
}
}
TEST_P(CompactionIteratorTsGcTest, SomeMergesOlderThanThreshold) {
constexpr char user_key[][2] = {"a", "f"};
const std::vector<std::string> input_keys = {
test::KeyStr(25000, user_key[0], 2500, kTypeMerge),
test::KeyStr(19000, user_key[0], 2300, kTypeMerge),
test::KeyStr(18000, user_key[0], 1800, kTypeMerge),
test::KeyStr(16000, user_key[0], 1600, kTypeValue),
test::KeyStr(19000, user_key[1], 2000, kTypeMerge),
test::KeyStr(17000, user_key[1], 1700, kTypeMerge),
test::KeyStr(15000, user_key[1], 1600,
kTypeDeletionWithTimestamp)};
const std::vector<std::string> input_values = {"25", "19", "18", "16",
"19", "17", ""};
std::shared_ptr<MergeOperator> merge_op =
MergeOperators::CreateStringAppendTESTOperator();
std::string full_history_ts_low;
PutFixed64(&full_history_ts_low, 20000);
const std::vector<std::pair<bool, bool>> params = {
{false, false}, {false, true}, {true, true}};
{
AddSnapshot(1600);
AddSnapshot(1900);
const std::vector<std::string> expected_keys = {
test::KeyStr(25000, user_key[0], 2500, kTypeMerge),
test::KeyStr(19000, user_key[0], 2300, kTypeMerge),
test::KeyStr(18000, user_key[0], 1800, kTypeMerge),
test::KeyStr(16000, user_key[0], 1600, kTypeValue),
test::KeyStr(19000, user_key[1], 2000, kTypeMerge),
test::KeyStr(17000, user_key[1], 1700, kTypeMerge),
test::KeyStr(15000, user_key[1], 1600,
kTypeDeletionWithTimestamp)};
const std::vector<std::string> expected_values = {"25", "19", "18", "16",
"19", "17", ""};
for (const auto& param : params) {
const bool bottommost_level = param.first;
const bool key_not_exists_beyond_output_level = param.second;
auto expected_keys_copy = expected_keys;
auto expected_values_copy = expected_values;
if (bottommost_level || key_not_exists_beyond_output_level) {
expected_keys_copy.pop_back();
expected_values_copy.pop_back();
if (bottommost_level) {
expected_keys_copy[3] =
test::KeyStr(0, user_key[0], 0, kTypeValue);
}
}
RunTest(input_keys, input_values, expected_keys_copy,
expected_values_copy,
kMaxSequenceNumber, merge_op.get(),
nullptr, bottommost_level,
kMaxSequenceNumber,
key_not_exists_beyond_output_level, &full_history_ts_low);
}
ClearSnapshots();
}
{
const std::vector<std::string> expected_keys = {
test::KeyStr(25000, user_key[0], 2500, kTypeValue),
test::KeyStr(19000, user_key[1], 2000, kTypeValue)};
const std::vector<std::string> expected_values = {"16,18,19,25", "17,19"};
for (const auto& param : params) {
const bool bottommost_level = param.first;
const bool key_not_exists_beyond_output_level = param.second;
auto expected_keys_copy = expected_keys;
auto expected_values_copy = expected_values;
if (bottommost_level) {
expected_keys_copy[1] =
test::KeyStr(0, user_key[1], 0, kTypeValue);
}
RunTest(input_keys, input_values, expected_keys_copy,
expected_values_copy,
kMaxSequenceNumber, merge_op.get(),
nullptr, bottommost_level,
kMaxSequenceNumber,
key_not_exists_beyond_output_level, &full_history_ts_low);
}
}
}
TEST_P(CompactionIteratorTsGcTest, NewHidesOldSameSnapshot) {
constexpr char user_key[] = "a";
const std::vector<std::string> input_keys = {
test::KeyStr(103, user_key, 4, kTypeDeletionWithTimestamp),
test::KeyStr(102, user_key, 3, kTypeValue),
test::KeyStr(101, user_key, 2, kTypeValue),
test::KeyStr(100, user_key, 1, kTypeValue)};
const std::vector<std::string> input_values = {"", "a2", "a1", "a0"};
{
std::string full_history_ts_low;
PutFixed64(&full_history_ts_low, 102);
const std::vector<std::string> expected_keys = {
input_keys[0], input_keys[1], input_keys[2]};
const std::vector<std::string> expected_values = {"", input_values[1],
input_values[2]};
RunTest(input_keys, input_values, expected_keys, expected_values,
kMaxSequenceNumber,
nullptr, nullptr,
false,
kMaxSequenceNumber,
false, &full_history_ts_low);
}
}
TEST_P(CompactionIteratorTsGcTest, DropTombstones) {
constexpr char user_key[] = "a";
const std::vector<std::string> input_keys = {
test::KeyStr(103, user_key, 4, kTypeDeletionWithTimestamp),
test::KeyStr(102, user_key, 3, kTypeValue),
test::KeyStr(101, user_key, 2, kTypeDeletionWithTimestamp),
test::KeyStr(100, user_key, 1, kTypeValue)};
const std::vector<std::string> input_values = {"", "a2", "", "a0"};
const std::vector<std::string> expected_keys = {input_keys[0], input_keys[1]};
const std::vector<std::string> expected_values = {"", "a2"};
AddSnapshot(2);
{
std::string full_history_ts_low;
PutFixed64(&full_history_ts_low, 102);
RunTest(input_keys, input_values, expected_keys, expected_values,
kMaxSequenceNumber,
nullptr, nullptr,
false,
kMaxSequenceNumber,
true, &full_history_ts_low);
}
{
std::string full_history_ts_low;
PutFixed64(&full_history_ts_low, 102);
RunTest(input_keys, input_values, expected_keys, expected_values,
kMaxSequenceNumber,
nullptr, nullptr,
true,
kMaxSequenceNumber,
false, &full_history_ts_low);
}
}
TEST_P(CompactionIteratorTsGcTest, RewriteTs) {
constexpr char user_key[] = "a";
const std::vector<std::string> input_keys = {
test::KeyStr(103, user_key, 4, kTypeDeletionWithTimestamp),
test::KeyStr(102, user_key, 3, kTypeValue),
test::KeyStr(101, user_key, 2, kTypeDeletionWithTimestamp),
test::KeyStr(100, user_key, 1, kTypeValue)};
const std::vector<std::string> input_values = {"", "a2", "", "a0"};
const std::vector<std::string> expected_keys = {
input_keys[0], input_keys[1], input_keys[2],
test::KeyStr(0, user_key, 0, kTypeValue)};
const std::vector<std::string> expected_values = {"", "a2", "", "a0"};
AddSnapshot(1);
AddSnapshot(2);
{
std::string full_history_ts_low;
PutFixed64(&full_history_ts_low, 102);
RunTest(input_keys, input_values, expected_keys, expected_values,
kMaxSequenceNumber,
nullptr, nullptr,
true,
kMaxSequenceNumber,
true, &full_history_ts_low);
}
}
TEST_P(CompactionIteratorTsGcTest, SingleDeleteNoKeyEligibleForGC) {
constexpr char user_key[][2] = {{'a', '\0'}, {'b', '\0'}};
const std::vector<std::string> input_keys = {
test::KeyStr(104, user_key[0], 4, kTypeSingleDeletion),
test::KeyStr(103, user_key[0], 3, kTypeValue),
test::KeyStr(102, user_key[1], 2, kTypeValue)};
const std::vector<std::string> input_values = {"", "a3", "b2"};
std::string full_history_ts_low;
PutFixed64(&full_history_ts_low, 102);
const std::vector<std::string>& expected_keys = input_keys;
const std::vector<std::string>& expected_values = input_values;
const std::vector<std::pair<bool, bool>> params = {
{false, false}, {false, true}, {true, true}};
for (const std::pair<bool, bool>& param : params) {
const bool bottommost_level = param.first;
const bool key_not_exists_beyond_output_level = param.second;
RunTest(input_keys, input_values, expected_keys, expected_values,
kMaxSequenceNumber,
nullptr, nullptr,
bottommost_level,
kMaxSequenceNumber,
key_not_exists_beyond_output_level, &full_history_ts_low);
}
}
TEST_P(CompactionIteratorTsGcTest, SingleDeleteDropTombstones) {
constexpr char user_key[] = "a";
const std::vector<std::string> input_keys = {
test::KeyStr(103, user_key, 4, kTypeSingleDeletion),
test::KeyStr(102, user_key, 3, kTypeValue),
test::KeyStr(101, user_key, 2, kTypeSingleDeletion),
test::KeyStr(100, user_key, 1, kTypeValue)};
const std::vector<std::string> input_values = {"", "a2", "", "a0"};
const std::vector<std::string> expected_keys = {input_keys[0], input_keys[1]};
const std::vector<std::string> expected_values = {"", "a2"};
AddSnapshot(2);
{
const std::vector<std::pair<bool, bool>> params = {
{false, false}, {false, true}, {true, true}};
for (const std::pair<bool, bool>& param : params) {
const bool bottommost_level = param.first;
const bool key_not_exists_beyond_output_level = param.second;
std::string full_history_ts_low;
PutFixed64(&full_history_ts_low, 102);
RunTest(input_keys, input_values, expected_keys, expected_values,
kMaxSequenceNumber,
nullptr, nullptr,
bottommost_level,
kMaxSequenceNumber,
key_not_exists_beyond_output_level, &full_history_ts_low);
}
}
}
TEST_P(CompactionIteratorTsGcTest, SingleDeleteAllKeysOlderThanThreshold) {
constexpr char user_key[][2] = {{'a', '\0'}, {'b', '\0'}};
const std::vector<std::string> input_keys = {
test::KeyStr(103, user_key[0], 4, kTypeSingleDeletion),
test::KeyStr(102, user_key[0], 3, kTypeValue),
test::KeyStr(104, user_key[1], 5, kTypeValue)};
const std::vector<std::string> input_values = {"", "a2", "b5"};
std::string full_history_ts_low;
PutFixed64(&full_history_ts_low, std::numeric_limits<uint64_t>::max());
{
AddSnapshot(3);
const std::vector<std::string> expected_keys = {
input_keys[0], input_keys[1], input_keys[2]};
const std::vector<std::string> expected_values = {"", "a2", "b5"};
RunTest(input_keys, input_values, expected_keys, expected_values,
kMaxSequenceNumber,
nullptr, nullptr,
false,
kMaxSequenceNumber,
false, &full_history_ts_low);
ClearSnapshots();
}
{
const std::vector<std::string> expected_keys = {input_keys[2]};
const std::vector<std::string> expected_values = {"b5"};
RunTest(input_keys, input_values, expected_keys, expected_values,
kMaxSequenceNumber,
nullptr, nullptr,
false,
kMaxSequenceNumber,
false, &full_history_ts_low);
}
}
TEST_P(CompactionIteratorTsGcTest, ZeroSeqOfKeyAndSnapshot) {
AddSnapshot(0);
std::string full_history_ts_low;
PutFixed64(&full_history_ts_low, std::numeric_limits<uint64_t>::max());
const std::vector<std::string> input_keys = {
test::KeyStr(101, "a", 0, kTypeValue),
test::KeyStr(102, "b", 0, kTypeValue)};
const std::vector<std::string> input_values = {"a1", "b1"};
RunTest(input_keys, input_values, input_keys, input_values,
kMaxSequenceNumber,
nullptr, nullptr,
false,
kMaxSequenceNumber,
false, &full_history_ts_low);
}
INSTANTIATE_TEST_CASE_P(CompactionIteratorTsGcTestInstance,
CompactionIteratorTsGcTest,
testing::Values(true, false));
}
int main(int argc, char** argv) {
ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}