#include "table/merging_iterator.h"
#include <string>
#include <vector>
#include "db/dbformat.h"
#include "db/pinned_iterators_manager.h"
#include "memory/arena.h"
#include "monitoring/perf_context_imp.h"
#include "rocksdb/comparator.h"
#include "rocksdb/iterator.h"
#include "rocksdb/options.h"
#include "table/internal_iterator.h"
#include "table/iter_heap.h"
#include "table/iterator_wrapper.h"
#include "test_util/sync_point.h"
#include "util/autovector.h"
#include "util/heap.h"
#include "util/stop_watch.h"
namespace ROCKSDB_NAMESPACE {
namespace {
typedef BinaryHeap<IteratorWrapper*, MaxIteratorComparator> MergerMaxIterHeap;
typedef BinaryHeap<IteratorWrapper*, MinIteratorComparator> MergerMinIterHeap;
}
const size_t kNumIterReserve = 4;
class MergingIterator : public InternalIterator {
public:
MergingIterator(const InternalKeyComparator* comparator,
InternalIterator** children, int n, bool is_arena_mode,
bool prefix_seek_mode)
: is_arena_mode_(is_arena_mode),
comparator_(comparator),
current_(nullptr),
direction_(kForward),
minHeap_(comparator_),
prefix_seek_mode_(prefix_seek_mode),
pinned_iters_mgr_(nullptr) {
children_.resize(n);
for (int i = 0; i < n; i++) {
children_[i].Set(children[i]);
}
for (auto& child : children_) {
AddToMinHeapOrCheckStatus(&child);
}
current_ = CurrentForward();
}
void considerStatus(Status s) {
if (!s.ok() && status_.ok()) {
status_ = s;
}
}
virtual void AddIterator(InternalIterator* iter) {
assert(direction_ == kForward);
children_.emplace_back(iter);
if (pinned_iters_mgr_) {
iter->SetPinnedItersMgr(pinned_iters_mgr_);
}
auto new_wrapper = children_.back();
AddToMinHeapOrCheckStatus(&new_wrapper);
if (new_wrapper.Valid()) {
current_ = CurrentForward();
}
}
~MergingIterator() override {
for (auto& child : children_) {
child.DeleteIter(is_arena_mode_);
}
status_.PermitUncheckedError();
}
bool Valid() const override { return current_ != nullptr && status_.ok(); }
Status status() const override { return status_; }
void SeekToFirst() override {
ClearHeaps();
status_ = Status::OK();
for (auto& child : children_) {
child.SeekToFirst();
AddToMinHeapOrCheckStatus(&child);
}
direction_ = kForward;
current_ = CurrentForward();
}
void SeekToLast() override {
ClearHeaps();
InitMaxHeap();
status_ = Status::OK();
for (auto& child : children_) {
child.SeekToLast();
AddToMaxHeapOrCheckStatus(&child);
}
direction_ = kReverse;
current_ = CurrentReverse();
}
void Seek(const Slice& target) override {
ClearHeaps();
status_ = Status::OK();
for (auto& child : children_) {
{
PERF_TIMER_GUARD(seek_child_seek_time);
child.Seek(target);
}
PERF_COUNTER_ADD(seek_child_seek_count, 1);
{
PERF_TIMER_GUARD(seek_min_heap_time);
AddToMinHeapOrCheckStatus(&child);
}
}
direction_ = kForward;
{
PERF_TIMER_GUARD(seek_min_heap_time);
current_ = CurrentForward();
}
}
void SeekForPrev(const Slice& target) override {
ClearHeaps();
InitMaxHeap();
status_ = Status::OK();
for (auto& child : children_) {
{
PERF_TIMER_GUARD(seek_child_seek_time);
child.SeekForPrev(target);
}
PERF_COUNTER_ADD(seek_child_seek_count, 1);
{
PERF_TIMER_GUARD(seek_max_heap_time);
AddToMaxHeapOrCheckStatus(&child);
}
}
direction_ = kReverse;
{
PERF_TIMER_GUARD(seek_max_heap_time);
current_ = CurrentReverse();
}
}
void Next() override {
assert(Valid());
if (direction_ != kForward) {
SwitchToForward();
}
assert(current_ == CurrentForward());
current_->Next();
if (current_->Valid()) {
assert(current_->status().ok());
minHeap_.replace_top(current_);
} else {
considerStatus(current_->status());
minHeap_.pop();
}
current_ = CurrentForward();
}
bool NextAndGetResult(IterateResult* result) override {
Next();
bool is_valid = Valid();
if (is_valid) {
result->key = key();
result->bound_check_result = UpperBoundCheckResult();
result->value_prepared = current_->IsValuePrepared();
}
return is_valid;
}
void Prev() override {
assert(Valid());
if (direction_ != kReverse) {
SwitchToBackward();
}
assert(current_ == CurrentReverse());
current_->Prev();
if (current_->Valid()) {
assert(current_->status().ok());
maxHeap_->replace_top(current_);
} else {
considerStatus(current_->status());
maxHeap_->pop();
}
current_ = CurrentReverse();
}
Slice key() const override {
assert(Valid());
return current_->key();
}
Slice value() const override {
assert(Valid());
return current_->value();
}
bool PrepareValue() override {
assert(Valid());
if (current_->PrepareValue()) {
return true;
}
considerStatus(current_->status());
assert(!status_.ok());
return false;
}
bool MayBeOutOfLowerBound() override {
assert(Valid());
return current_->MayBeOutOfLowerBound();
}
IterBoundCheck UpperBoundCheckResult() override {
assert(Valid());
return current_->UpperBoundCheckResult();
}
void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override {
pinned_iters_mgr_ = pinned_iters_mgr;
for (auto& child : children_) {
child.SetPinnedItersMgr(pinned_iters_mgr);
}
}
bool IsKeyPinned() const override {
assert(Valid());
return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
current_->IsKeyPinned();
}
bool IsValuePinned() const override {
assert(Valid());
return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
current_->IsValuePinned();
}
private:
void ClearHeaps();
void InitMaxHeap();
bool is_arena_mode_;
const InternalKeyComparator* comparator_;
autovector<IteratorWrapper, kNumIterReserve> children_;
IteratorWrapper* current_;
Status status_;
enum Direction {
kForward,
kReverse
};
Direction direction_;
MergerMinIterHeap minHeap_;
bool prefix_seek_mode_;
std::unique_ptr<MergerMaxIterHeap> maxHeap_;
PinnedIteratorsManager* pinned_iters_mgr_;
void AddToMinHeapOrCheckStatus(IteratorWrapper*);
void AddToMaxHeapOrCheckStatus(IteratorWrapper*);
void SwitchToForward();
void SwitchToBackward();
IteratorWrapper* CurrentForward() const {
assert(direction_ == kForward);
return !minHeap_.empty() ? minHeap_.top() : nullptr;
}
IteratorWrapper* CurrentReverse() const {
assert(direction_ == kReverse);
assert(maxHeap_);
return !maxHeap_->empty() ? maxHeap_->top() : nullptr;
}
};
void MergingIterator::AddToMinHeapOrCheckStatus(IteratorWrapper* child) {
if (child->Valid()) {
assert(child->status().ok());
minHeap_.push(child);
} else {
considerStatus(child->status());
}
}
void MergingIterator::AddToMaxHeapOrCheckStatus(IteratorWrapper* child) {
if (child->Valid()) {
assert(child->status().ok());
maxHeap_->push(child);
} else {
considerStatus(child->status());
}
}
void MergingIterator::SwitchToForward() {
ClearHeaps();
Slice target = key();
for (auto& child : children_) {
if (&child != current_) {
child.Seek(target);
if (child.Valid() && comparator_->Equal(target, child.key())) {
assert(child.status().ok());
child.Next();
}
}
AddToMinHeapOrCheckStatus(&child);
}
direction_ = kForward;
}
void MergingIterator::SwitchToBackward() {
ClearHeaps();
InitMaxHeap();
Slice target = key();
for (auto& child : children_) {
if (&child != current_) {
child.SeekForPrev(target);
TEST_SYNC_POINT_CALLBACK("MergeIterator::Prev:BeforePrev", &child);
if (child.Valid() && comparator_->Equal(target, child.key())) {
assert(child.status().ok());
child.Prev();
}
}
AddToMaxHeapOrCheckStatus(&child);
}
direction_ = kReverse;
if (!prefix_seek_mode_) {
current_ = CurrentReverse();
}
assert(current_ == CurrentReverse());
}
void MergingIterator::ClearHeaps() {
minHeap_.clear();
if (maxHeap_) {
maxHeap_->clear();
}
}
void MergingIterator::InitMaxHeap() {
if (!maxHeap_) {
maxHeap_.reset(new MergerMaxIterHeap(comparator_));
}
}
InternalIterator* NewMergingIterator(const InternalKeyComparator* cmp,
InternalIterator** list, int n,
Arena* arena, bool prefix_seek_mode) {
assert(n >= 0);
if (n == 0) {
return NewEmptyInternalIterator<Slice>(arena);
} else if (n == 1) {
return list[0];
} else {
if (arena == nullptr) {
return new MergingIterator(cmp, list, n, false, prefix_seek_mode);
} else {
auto mem = arena->AllocateAligned(sizeof(MergingIterator));
return new (mem) MergingIterator(cmp, list, n, true, prefix_seek_mode);
}
}
}
MergeIteratorBuilder::MergeIteratorBuilder(
const InternalKeyComparator* comparator, Arena* a, bool prefix_seek_mode)
: first_iter(nullptr), use_merging_iter(false), arena(a) {
auto mem = arena->AllocateAligned(sizeof(MergingIterator));
merge_iter =
new (mem) MergingIterator(comparator, nullptr, 0, true, prefix_seek_mode);
}
MergeIteratorBuilder::~MergeIteratorBuilder() {
if (first_iter != nullptr) {
first_iter->~InternalIterator();
}
if (merge_iter != nullptr) {
merge_iter->~MergingIterator();
}
}
void MergeIteratorBuilder::AddIterator(InternalIterator* iter) {
if (!use_merging_iter && first_iter != nullptr) {
merge_iter->AddIterator(first_iter);
use_merging_iter = true;
first_iter = nullptr;
}
if (use_merging_iter) {
merge_iter->AddIterator(iter);
} else {
first_iter = iter;
}
}
InternalIterator* MergeIteratorBuilder::Finish() {
InternalIterator* ret = nullptr;
if (!use_merging_iter) {
ret = first_iter;
first_iter = nullptr;
} else {
ret = merge_iter;
merge_iter = nullptr;
}
return ret;
}
}