#include "table/compaction_merging_iterator.h"
#include "db/internal_stats.h"
namespace ROCKSDB_NAMESPACE {
class CompactionMergingIterator : public InternalIterator {
public:
CompactionMergingIterator(
const InternalKeyComparator* comparator, InternalIterator** children,
int n, bool is_arena_mode,
std::vector<std::pair<std::unique_ptr<TruncatedRangeDelIterator>,
std::unique_ptr<TruncatedRangeDelIterator>**>>&
range_tombstones,
InternalStats* internal_stats)
: is_arena_mode_(is_arena_mode),
comparator_(comparator),
current_(nullptr),
minHeap_(CompactionHeapItemComparator(comparator_)),
pinned_iters_mgr_(nullptr),
internal_stats_(internal_stats),
num_sorted_runs_recorded_(0) {
children_.resize(n);
for (int i = 0; i < n; i++) {
children_[i].level = i;
children_[i].iter.Set(children[i]);
assert(children_[i].type == HeapItem::ITERATOR);
}
assert(range_tombstones.size() == static_cast<size_t>(n));
for (auto& p : range_tombstones) {
range_tombstone_iters_.push_back(std::move(p.first));
}
pinned_heap_item_.resize(n);
for (int i = 0; i < n; ++i) {
if (range_tombstones[i].second) {
*range_tombstones[i].second = &range_tombstone_iters_[i];
}
pinned_heap_item_[i].level = i;
pinned_heap_item_[i].type = HeapItem::DELETE_RANGE_START;
}
if (internal_stats_) {
TEST_SYNC_POINT("CompactionMergingIterator::UpdateInternalStats");
num_sorted_runs_recorded_ = n;
internal_stats_->IncrNumRunningCompactionSortedRuns(
num_sorted_runs_recorded_);
assert(num_sorted_runs_recorded_ <=
internal_stats_->NumRunningCompactionSortedRuns());
}
}
void considerStatus(const Status& s) {
if (!s.ok() && status_.ok()) {
status_ = s;
}
}
~CompactionMergingIterator() override {
if (internal_stats_) {
assert(num_sorted_runs_recorded_ == range_tombstone_iters_.size());
assert(num_sorted_runs_recorded_ <=
internal_stats_->NumRunningCompactionSortedRuns());
internal_stats_->DecrNumRunningCompactionSortedRuns(
num_sorted_runs_recorded_);
}
range_tombstone_iters_.clear();
for (auto& child : children_) {
child.iter.DeleteIter(is_arena_mode_);
}
status_.PermitUncheckedError();
}
bool Valid() const override { return current_ != nullptr && status_.ok(); }
Status status() const override { return status_; }
void SeekToFirst() override;
void Seek(const Slice& target) override;
void Next() override;
Slice key() const override {
assert(Valid());
return current_->key();
}
Slice value() const override {
assert(Valid());
if (LIKELY(current_->type == HeapItem::ITERATOR)) {
return current_->iter.value();
} else {
return dummy_tombstone_val;
}
}
bool MayBeOutOfLowerBound() override {
assert(Valid());
return current_->type == HeapItem::DELETE_RANGE_START ||
current_->iter.MayBeOutOfLowerBound();
}
IterBoundCheck UpperBoundCheckResult() override {
assert(Valid());
return current_->type == HeapItem::DELETE_RANGE_START
? IterBoundCheck::kUnknown
: current_->iter.UpperBoundCheckResult();
}
void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override {
pinned_iters_mgr_ = pinned_iters_mgr;
for (auto& child : children_) {
child.iter.SetPinnedItersMgr(pinned_iters_mgr);
}
}
bool IsDeleteRangeSentinelKey() const override {
assert(Valid());
return current_->type == HeapItem::DELETE_RANGE_START;
}
void SeekToLast() override { assert(false); }
void SeekForPrev(const Slice&) override { assert(false); }
void Prev() override { assert(false); }
bool NextAndGetResult(IterateResult*) override {
assert(false);
return false;
}
bool IsKeyPinned() const override {
assert(false);
return false;
}
bool IsValuePinned() const override {
assert(false);
return false;
}
bool PrepareValue() override {
assert(false);
return false;
}
private:
struct HeapItem {
HeapItem() = default;
IteratorWrapper iter;
size_t level = 0;
std::string tombstone_str;
enum Type { ITERATOR, DELETE_RANGE_START };
Type type = ITERATOR;
explicit HeapItem(size_t _level, InternalIteratorBase<Slice>* _iter)
: level(_level), type(Type::ITERATOR) {
iter.Set(_iter);
}
void SetTombstoneForCompaction(const ParsedInternalKey&& pik) {
tombstone_str.clear();
AppendInternalKey(&tombstone_str, pik);
}
[[nodiscard]] Slice key() const {
return type == ITERATOR ? iter.key() : tombstone_str;
}
};
class CompactionHeapItemComparator {
public:
explicit CompactionHeapItemComparator(
const InternalKeyComparator* comparator)
: comparator_(comparator) {}
bool operator()(HeapItem* a, HeapItem* b) const {
int r = comparator_->Compare(a->key(), b->key());
assert(a->type == b->type || r != 0);
return r > 0;
}
private:
const InternalKeyComparator* comparator_;
};
using CompactionMinHeap = BinaryHeap<HeapItem*, CompactionHeapItemComparator>;
bool is_arena_mode_;
const InternalKeyComparator* comparator_;
std::vector<HeapItem> children_;
std::vector<HeapItem> pinned_heap_item_;
std::vector<std::unique_ptr<TruncatedRangeDelIterator>>
range_tombstone_iters_;
std::string dummy_tombstone_val{};
void FindNextVisibleKey();
HeapItem* current_;
Status status_;
CompactionMinHeap minHeap_;
PinnedIteratorsManager* pinned_iters_mgr_;
InternalStats* internal_stats_;
uint64_t num_sorted_runs_recorded_;
void AddToMinHeapOrCheckStatus(HeapItem*);
HeapItem* CurrentForward() const {
return !minHeap_.empty() ? minHeap_.top() : nullptr;
}
void InsertRangeTombstoneAtLevel(size_t level) {
if (range_tombstone_iters_[level]->Valid()) {
pinned_heap_item_[level].SetTombstoneForCompaction(
range_tombstone_iters_[level]->start_key());
minHeap_.push(&pinned_heap_item_[level]);
}
}
};
void CompactionMergingIterator::SeekToFirst() {
minHeap_.clear();
status_ = Status::OK();
for (auto& child : children_) {
child.iter.SeekToFirst();
AddToMinHeapOrCheckStatus(&child);
}
for (size_t i = 0; i < range_tombstone_iters_.size(); ++i) {
if (range_tombstone_iters_[i]) {
range_tombstone_iters_[i]->SeekToFirst();
InsertRangeTombstoneAtLevel(i);
}
}
FindNextVisibleKey();
current_ = CurrentForward();
}
void CompactionMergingIterator::Seek(const Slice& target) {
minHeap_.clear();
status_ = Status::OK();
for (auto& child : children_) {
child.iter.Seek(target);
AddToMinHeapOrCheckStatus(&child);
}
ParsedInternalKey pik;
ParseInternalKey(target, &pik, false )
.PermitUncheckedError();
for (size_t i = 0; i < range_tombstone_iters_.size(); ++i) {
if (range_tombstone_iters_[i]) {
range_tombstone_iters_[i]->Seek(pik.user_key);
while (range_tombstone_iters_[i]->Valid() &&
comparator_->Compare(range_tombstone_iters_[i]->start_key(), pik) <
0) {
range_tombstone_iters_[i]->Next();
}
InsertRangeTombstoneAtLevel(i);
}
}
FindNextVisibleKey();
current_ = CurrentForward();
}
void CompactionMergingIterator::Next() {
assert(Valid());
assert(current_ == CurrentForward());
if (current_->type == HeapItem::ITERATOR) {
current_->iter.Next();
if (current_->iter.Valid()) {
assert(current_->iter.status().ok());
minHeap_.replace_top(current_);
} else {
considerStatus(current_->iter.status());
minHeap_.pop();
}
} else {
assert(current_->type == HeapItem::DELETE_RANGE_START);
size_t level = current_->level;
assert(range_tombstone_iters_[level]);
range_tombstone_iters_[level]->Next();
if (range_tombstone_iters_[level]->Valid()) {
pinned_heap_item_[level].SetTombstoneForCompaction(
range_tombstone_iters_[level]->start_key());
minHeap_.replace_top(&pinned_heap_item_[level]);
} else {
minHeap_.pop();
}
}
FindNextVisibleKey();
current_ = CurrentForward();
}
void CompactionMergingIterator::FindNextVisibleKey() {
while (!minHeap_.empty()) {
HeapItem* current = minHeap_.top();
if (current->type != HeapItem::ITERATOR ||
!current->iter.IsDeleteRangeSentinelKey()) {
return;
}
assert(!range_tombstone_iters_[current->level] ||
!range_tombstone_iters_[current->level]->Valid());
current->iter.Next();
if (current->iter.Valid()) {
assert(current->iter.status().ok());
minHeap_.replace_top(current);
} else {
considerStatus(current->iter.status());
minHeap_.pop();
}
if (range_tombstone_iters_[current->level]) {
InsertRangeTombstoneAtLevel(current->level);
}
}
}
void CompactionMergingIterator::AddToMinHeapOrCheckStatus(HeapItem* child) {
if (child->iter.Valid()) {
assert(child->iter.status().ok());
minHeap_.push(child);
} else {
considerStatus(child->iter.status());
}
}
InternalIterator* NewCompactionMergingIterator(
const InternalKeyComparator* comparator, InternalIterator** children, int n,
std::vector<std::pair<std::unique_ptr<TruncatedRangeDelIterator>,
std::unique_ptr<TruncatedRangeDelIterator>**>>&
range_tombstone_iters,
Arena* arena, InternalStats* stats) {
assert(n >= 0);
if (n == 0) {
return NewEmptyInternalIterator<Slice>(arena);
} else {
if (arena == nullptr) {
return new CompactionMergingIterator(comparator, children, n,
false ,
range_tombstone_iters, stats);
} else {
auto mem = arena->AllocateAligned(sizeof(CompactionMergingIterator));
return new (mem) CompactionMergingIterator(comparator, children, n,
true ,
range_tombstone_iters, stats);
}
}
}
}