#pragma once
#include <algorithm>
#include <cinttypes>
#include <deque>
#include <string>
#include <unordered_set>
#include <vector>
#include "db/compaction/compaction.h"
#include "db/compaction/compaction_iteration_stats.h"
#include "db/merge_helper.h"
#include "db/pinned_iterators_manager.h"
#include "db/range_del_aggregator.h"
#include "db/snapshot_checker.h"
#include "options/cf_options.h"
#include "rocksdb/compaction_filter.h"
namespace ROCKSDB_NAMESPACE {
class BlobFileBuilder;
class BlobFetcher;
class PrefetchBufferCollection;
class SequenceIterWrapper : public InternalIterator {
public:
SequenceIterWrapper(InternalIterator* iter, const Comparator* cmp,
bool need_count_entries)
: icmp_(cmp),
inner_iter_(iter),
need_count_entries_(need_count_entries) {}
bool Valid() const override { return inner_iter_->Valid(); }
Status status() const override { return inner_iter_->status(); }
void Next() override {
if (!inner_iter_->IsDeleteRangeSentinelKey()) {
num_itered_++;
}
inner_iter_->Next();
}
void Seek(const Slice& target) override {
if (!need_count_entries_) {
has_num_itered_ = false;
inner_iter_->Seek(target);
} else {
while (inner_iter_->Valid() &&
icmp_.Compare(inner_iter_->key(), target) < 0) {
Next();
}
}
}
Slice key() const override { return inner_iter_->key(); }
Slice value() const override { return inner_iter_->value(); }
void SeekToFirst() override { assert(false); }
void Prev() override { assert(false); }
void SeekForPrev(const Slice& ) override { assert(false); }
void SeekToLast() override { assert(false); }
uint64_t NumItered() const { return num_itered_; }
bool HasNumItered() const { return has_num_itered_; }
bool IsDeleteRangeSentinelKey() const override {
assert(Valid());
return inner_iter_->IsDeleteRangeSentinelKey();
}
private:
InternalKeyComparator icmp_;
InternalIterator* inner_iter_; uint64_t num_itered_ = 0;
bool need_count_entries_;
bool has_num_itered_ = true;
};
class CompactionIterator {
public:
class CompactionProxy {
public:
virtual ~CompactionProxy() = default;
virtual int level() const = 0;
virtual bool KeyNotExistsBeyondOutputLevel(
const Slice& user_key, std::vector<size_t>* level_ptrs) const = 0;
virtual bool bottommost_level() const = 0;
virtual int number_levels() const = 0;
virtual Slice GetLargestUserKey() const = 0;
virtual bool allow_ingest_behind() const = 0;
virtual bool allow_mmap_reads() const = 0;
virtual bool enable_blob_garbage_collection() const = 0;
virtual double blob_garbage_collection_age_cutoff() const = 0;
virtual uint64_t blob_compaction_readahead_size() const = 0;
virtual const Version* input_version() const = 0;
virtual bool DoesInputReferenceBlobFiles() const = 0;
virtual const Compaction* real_compaction() const = 0;
virtual bool SupportsPerKeyPlacement() const = 0;
};
class RealCompaction : public CompactionProxy {
public:
explicit RealCompaction(const Compaction* compaction)
: compaction_(compaction) {
assert(compaction_);
}
int level() const override { return compaction_->level(); }
bool KeyNotExistsBeyondOutputLevel(
const Slice& user_key, std::vector<size_t>* level_ptrs) const override {
return compaction_->KeyNotExistsBeyondOutputLevel(user_key, level_ptrs);
}
bool bottommost_level() const override {
return compaction_->bottommost_level();
}
int number_levels() const override { return compaction_->number_levels(); }
Slice GetLargestUserKey() const override {
return compaction_->GetLargestUserKey();
}
bool allow_ingest_behind() const override {
return compaction_->immutable_options().cf_allow_ingest_behind ||
compaction_->immutable_options().allow_ingest_behind;
}
bool allow_mmap_reads() const override {
return compaction_->immutable_options().allow_mmap_reads;
}
bool enable_blob_garbage_collection() const override {
return compaction_->enable_blob_garbage_collection();
}
double blob_garbage_collection_age_cutoff() const override {
return compaction_->blob_garbage_collection_age_cutoff();
}
uint64_t blob_compaction_readahead_size() const override {
return compaction_->mutable_cf_options().blob_compaction_readahead_size;
}
const Version* input_version() const override {
return compaction_->input_version();
}
bool DoesInputReferenceBlobFiles() const override {
return compaction_->DoesInputReferenceBlobFiles();
}
const Compaction* real_compaction() const override { return compaction_; }
bool SupportsPerKeyPlacement() const override {
return compaction_->SupportsPerKeyPlacement();
}
private:
const Compaction* compaction_;
};
CompactionIterator(
InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots,
SequenceNumber earliest_snapshot,
SequenceNumber earliest_write_conflict_snapshot,
SequenceNumber job_snapshot, const SnapshotChecker* snapshot_checker,
Env* env, bool report_detailed_time,
CompactionRangeDelAggregator* range_del_agg,
BlobFileBuilder* blob_file_builder, bool allow_data_in_errors,
bool enforce_single_del_contracts,
const std::atomic<bool>& manual_compaction_canceled,
bool must_count_input_entries, const Compaction* compaction = nullptr,
const CompactionFilter* compaction_filter = nullptr,
const std::atomic<bool>* shutting_down = nullptr,
const std::shared_ptr<Logger> info_log = nullptr,
const std::string* full_history_ts_low = nullptr,
std::optional<SequenceNumber> preserve_seqno_min = {});
CompactionIterator(InternalIterator* input, const Comparator* cmp,
MergeHelper* merge_helper, SequenceNumber last_sequence,
std::vector<SequenceNumber>* snapshots,
SequenceNumber earliest_snapshot,
SequenceNumber earliest_write_conflict_snapshot,
SequenceNumber job_snapshot,
const SnapshotChecker* snapshot_checker, Env* env,
bool report_detailed_time,
CompactionRangeDelAggregator* range_del_agg,
BlobFileBuilder* blob_file_builder,
bool allow_data_in_errors,
bool enforce_single_del_contracts,
const std::atomic<bool>& manual_compaction_canceled,
std::unique_ptr<CompactionProxy> compaction,
bool must_count_input_entries,
const CompactionFilter* compaction_filter = nullptr,
const std::atomic<bool>* shutting_down = nullptr,
const std::shared_ptr<Logger> info_log = nullptr,
const std::string* full_history_ts_low = nullptr,
std::optional<SequenceNumber> preserve_seqno_min = {});
~CompactionIterator();
void ResetRecordCounts();
void SeekToFirst();
void Next();
const Slice& key() const { return key_; }
const Slice& value() const { return value_; }
const Status& status() const { return status_; }
const ParsedInternalKey& ikey() const { return ikey_; }
inline bool Valid() const { return validity_info_.IsValid(); }
const Slice& user_key() const {
if (UNLIKELY(is_range_del_)) {
return ikey_.user_key;
}
return current_user_key_;
}
const CompactionIterationStats& iter_stats() const { return iter_stats_; }
bool HasNumInputEntryScanned() const { return input_.HasNumItered(); }
uint64_t NumInputEntryScanned() const { return input_.NumItered(); }
bool IsCurrentKeyAlreadyScanned() const {
assert(Valid());
return at_next_ || merge_out_iter_.Valid();
}
Status InputStatus() const { return input_.status(); }
bool IsDeleteRangeSentinelKey() const { return is_range_del_; }
private:
void NextFromInput();
void PrepareOutput();
bool ExtractLargeValueIfNeededImpl();
void ExtractLargeValueIfNeeded();
void GarbageCollectBlobIfNeeded();
bool InvokeFilterIfNeeded(bool* need_skip, Slice* skip_until);
inline SequenceNumber findEarliestVisibleSnapshot(
SequenceNumber in, SequenceNumber* prev_snapshot);
inline bool KeyCommitted(SequenceNumber sequence) {
return snapshot_checker_ == nullptr ||
snapshot_checker_->CheckInSnapshot(sequence, job_snapshot_) ==
SnapshotCheckerResult::kInSnapshot;
}
bool DefinitelyInSnapshot(SequenceNumber seq, SequenceNumber snapshot);
bool DefinitelyNotInSnapshot(SequenceNumber seq, SequenceNumber snapshot);
inline void UpdateTimestampAndCompareWithFullHistoryLow() {
if (!timestamp_size_) {
return;
}
Slice ts = ExtractTimestampFromUserKey(ikey_.user_key, timestamp_size_);
curr_ts_.assign(ts.data(), ts.size());
if (full_history_ts_low_) {
cmp_with_history_ts_low_ =
cmp_->CompareTimestamp(ts, *full_history_ts_low_);
}
}
static uint64_t ComputeBlobGarbageCollectionCutoffFileNumber(
const CompactionProxy* compaction);
static std::unique_ptr<BlobFetcher> CreateBlobFetcherIfNeeded(
const CompactionProxy* compaction);
static std::unique_ptr<PrefetchBufferCollection>
CreatePrefetchBufferCollectionIfNeeded(const CompactionProxy* compaction);
SequenceIterWrapper input_;
const Comparator* cmp_;
MergeHelper* merge_helper_;
const std::vector<SequenceNumber>* snapshots_;
std::unordered_set<SequenceNumber> released_snapshots_;
const SequenceNumber earliest_write_conflict_snapshot_;
const SequenceNumber job_snapshot_;
const SnapshotChecker* const snapshot_checker_;
Env* env_;
SystemClock* clock_;
const bool report_detailed_time_;
CompactionRangeDelAggregator* range_del_agg_;
BlobFileBuilder* blob_file_builder_;
std::unique_ptr<CompactionProxy> compaction_;
const CompactionFilter* compaction_filter_;
const std::atomic<bool>* shutting_down_;
const std::atomic<bool>& manual_compaction_canceled_;
const bool bottommost_level_;
const bool visible_at_tip_;
const SequenceNumber earliest_snapshot_;
std::shared_ptr<Logger> info_log_;
const bool allow_data_in_errors_;
const bool enforce_single_del_contracts_;
const size_t timestamp_size_;
const std::string* const full_history_ts_low_;
enum ValidContext : uint8_t {
kMerge1 = 0,
kMerge2 = 1,
kParseKeyError = 2,
kCurrentKeyUncommitted = 3,
kKeepSDAndClearPut = 4,
kKeepTsHistory = 5,
kKeepSDForConflictCheck = 6,
kKeepSDForSnapshot = 7,
kKeepSD = 8,
kKeepDel = 9,
kNewUserKey = 10,
kRangeDeletion = 11,
kSwapPreferredSeqno = 12,
};
struct ValidityInfo {
inline bool IsValid() const { return rep & 1; }
ValidContext GetContext() const {
return static_cast<ValidContext>(rep >> 1);
}
inline void SetValid(uint8_t ctx) { rep = (ctx << 1) | 1; }
inline void Invalidate() { rep = 0; }
uint8_t rep{0};
} validity_info_;
Slice key_;
Slice value_;
Status status_;
ParsedInternalKey ikey_;
bool has_current_user_key_ = false;
bool at_next_ = false;
IterKey current_key_;
Slice current_user_key_;
std::string curr_ts_;
SequenceNumber current_user_key_sequence_;
SequenceNumber current_user_key_snapshot_;
bool has_outputted_key_ = false;
bool clear_and_output_next_key_ = false;
MergeOutputIterator merge_out_iter_;
Status merge_until_status_;
PinnedIteratorsManager pinned_iters_mgr_;
uint64_t blob_garbage_collection_cutoff_file_number_;
std::unique_ptr<BlobFetcher> blob_fetcher_;
std::unique_ptr<PrefetchBufferCollection> prefetch_buffers_;
std::string blob_index_;
PinnableSlice blob_value_;
std::string compaction_filter_value_;
InternalKey compaction_filter_skip_until_;
std::vector<size_t> level_ptrs_;
CompactionIterationStats iter_stats_;
bool current_key_committed_;
int cmp_with_history_ts_low_;
const int level_;
bool last_key_seq_zeroed_{false};
const SequenceNumber preserve_seqno_after_ = kMaxSequenceNumber;
void AdvanceInputIter() { input_.Next(); }
void SkipUntil(const Slice& skip_until) { input_.Seek(skip_until); }
bool IsShuttingDown() {
return shutting_down_ && shutting_down_->load(std::memory_order_relaxed);
}
bool IsPausingManualCompaction() {
return manual_compaction_canceled_.load(std::memory_order_relaxed);
}
bool is_range_del_{false};
};
inline bool CompactionIterator::DefinitelyInSnapshot(SequenceNumber seq,
SequenceNumber snapshot) {
return DataIsDefinitelyInSnapshot(seq, snapshot, snapshot_checker_);
}
inline bool CompactionIterator::DefinitelyNotInSnapshot(
SequenceNumber seq, SequenceNumber snapshot) {
return DataIsDefinitelyNotInSnapshot(seq, snapshot, snapshot_checker_);
}
}