#include "db/compaction/compaction_picker_universal.h"
#include <cstdint>
#include <limits>
#include <queue>
#include <string>
#include <utility>
#include "db/column_family.h"
#include "file/filename.h"
#include "logging/log_buffer.h"
#include "logging/logging.h"
#include "monitoring/statistics_impl.h"
#include "test_util/sync_point.h"
#include "util/random.h"
#include "util/string_util.h"
namespace ROCKSDB_NAMESPACE {
namespace {
class UniversalCompactionBuilder {
public:
UniversalCompactionBuilder(
const ImmutableOptions& ioptions, const InternalKeyComparator* icmp,
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
const MutableDBOptions& mutable_db_options,
const std::vector<SequenceNumber>& existing_snapshots,
const SnapshotChecker* snapshot_checker, VersionStorageInfo* vstorage,
UniversalCompactionPicker* picker, LogBuffer* log_buffer,
bool require_max_output_level, const std::string& full_history_ts_low)
: ioptions_(ioptions),
icmp_(icmp),
cf_name_(cf_name),
mutable_cf_options_(mutable_cf_options),
mutable_db_options_(mutable_db_options),
vstorage_(vstorage),
picker_(picker),
log_buffer_(log_buffer),
require_max_output_level_(require_max_output_level),
allow_ingest_behind_(ioptions.cf_allow_ingest_behind ||
ioptions.allow_ingest_behind),
full_history_ts_low_(full_history_ts_low) {
assert(icmp_);
const auto* ucmp = icmp_->user_comparator();
assert(ucmp);
if (ucmp->timestamp_size() == 0) {
earliest_snapshot_ = existing_snapshots.empty()
? kMaxSequenceNumber
: existing_snapshots.at(0);
snapshot_checker_ = snapshot_checker;
}
}
Compaction* PickCompaction();
private:
struct SortedRun {
SortedRun(int _level, FileMetaData* _file, uint64_t _size,
uint64_t _compensated_file_size, bool _being_compacted,
bool _level_has_marked_standalone_rangedel)
: level(_level),
file(_file),
size(_size),
compensated_file_size(_compensated_file_size),
being_compacted(_being_compacted),
level_has_marked_standalone_rangedel(
_level_has_marked_standalone_rangedel) {
assert(compensated_file_size > 0);
assert(level != 0 || file != nullptr);
}
void Dump(char* out_buf, size_t out_buf_size,
bool print_path = false) const;
void DumpSizeInfo(char* out_buf, size_t out_buf_size,
size_t sorted_run_count) const;
int level;
FileMetaData* file;
uint64_t size;
uint64_t compensated_file_size;
bool being_compacted;
bool level_has_marked_standalone_rangedel;
};
unsigned int GetMaxNumFilesToCompactBasedOnMaxReadAmp(
const int file_num_compaction_trigger, const unsigned int ratio,
int* num_sr_not_compacted_output, int* max_num_runs_output) const {
assert(num_sr_not_compacted_output);
assert(max_num_runs_output);
int max_num_runs =
mutable_cf_options_.compaction_options_universal.max_read_amp;
if (max_num_runs < 0) {
assert(max_num_runs == -1);
max_num_runs = file_num_compaction_trigger;
} else if (max_num_runs == 0) {
if (mutable_cf_options_.compaction_options_universal.stop_style ==
kCompactionStopStyleTotalSize) {
max_num_runs = 1;
double cur_level_max_size =
static_cast<double>(mutable_cf_options_.write_buffer_size);
double total_run_size = 0;
while (cur_level_max_size < static_cast<double>(max_run_size_)) {
total_run_size += cur_level_max_size;
cur_level_max_size = (100.0 + ratio) / 100.0 * total_run_size;
++max_num_runs;
}
} else {
max_num_runs = file_num_compaction_trigger;
}
} else {
}
int num_sr_not_compacted = 0;
for (size_t i = 0; i < sorted_runs_.size(); i++) {
if (sorted_runs_[i].being_compacted == false &&
!sorted_runs_[i].level_has_marked_standalone_rangedel) {
num_sr_not_compacted++;
}
}
*num_sr_not_compacted_output = num_sr_not_compacted;
*max_num_runs_output = max_num_runs;
if (num_sr_not_compacted > max_num_runs) {
return num_sr_not_compacted - max_num_runs + 1;
} else {
return 0;
}
}
Compaction* MaybePickPeriodicCompaction(Compaction* const prev_picked_c) {
if (prev_picked_c != nullptr ||
vstorage_->FilesMarkedForPeriodicCompaction().empty()) {
return prev_picked_c;
}
Compaction* c = PickPeriodicCompaction();
TEST_SYNC_POINT_CALLBACK("PostPickPeriodicCompaction", c);
if (c != nullptr) {
ROCKS_LOG_BUFFER(log_buffer_,
"[%s] Universal: picked for periodic compaction\n",
cf_name_.c_str());
}
return c;
}
Compaction* MaybePickSizeAmpCompaction(Compaction* const prev_picked_c,
int file_num_compaction_trigger) {
if (prev_picked_c != nullptr ||
sorted_runs_.size() <
static_cast<size_t>(file_num_compaction_trigger)) {
return prev_picked_c;
}
Compaction* c = PickCompactionToReduceSizeAmp();
if (c != nullptr) {
TEST_SYNC_POINT("PickCompactionToReduceSizeAmpReturnNonnullptr");
ROCKS_LOG_BUFFER(log_buffer_,
"[%s] Universal: picked for size amp compaction \n",
cf_name_.c_str());
}
return c;
}
Compaction* MaybePickCompactionToReduceSortedRunsBasedFileRatio(
Compaction* const prev_picked_c, int file_num_compaction_trigger,
unsigned int ratio) {
if (prev_picked_c != nullptr ||
sorted_runs_.size() <
static_cast<size_t>(file_num_compaction_trigger)) {
return prev_picked_c;
}
Compaction* c = PickCompactionToReduceSortedRuns(ratio, UINT_MAX);
if (c != nullptr) {
TEST_SYNC_POINT("PickCompactionToReduceSortedRunsReturnNonnullptr");
ROCKS_LOG_BUFFER(log_buffer_,
"[%s] Universal: picked for size ratio compaction to "
"reduce sorted run\n",
cf_name_.c_str());
}
return c;
}
Compaction* MaybePickCompactionToReduceSortedRuns(
Compaction* const prev_picked_c, int file_num_compaction_trigger,
unsigned int ratio) {
if (prev_picked_c != nullptr ||
sorted_runs_.size() <
static_cast<size_t>(file_num_compaction_trigger)) {
return prev_picked_c;
}
int num_sr_not_compacted = 0;
int max_num_runs = 0;
const unsigned int max_num_files_to_compact =
GetMaxNumFilesToCompactBasedOnMaxReadAmp(file_num_compaction_trigger,
ratio, &num_sr_not_compacted,
&max_num_runs);
if (max_num_files_to_compact == 0) {
ROCKS_LOG_BUFFER(
log_buffer_,
"[%s] Universal: skipping compaction to reduce sorted run, num "
"sorted runs not "
"being compacted -- %u, max num runs allowed -- %d, max_run_size "
"-- %" PRIu64 "\n",
cf_name_.c_str(), num_sr_not_compacted, max_num_runs, max_run_size_);
return nullptr;
}
Compaction* c =
PickCompactionToReduceSortedRuns(UINT_MAX, max_num_files_to_compact);
if (c != nullptr) {
ROCKS_LOG_BUFFER(log_buffer_,
"[%s] Universal: picked for sorted run num compaction "
"to reduce sorted run, to "
"compact file num -- %u, max num runs allowed"
"-- %d, max_run_size -- %" PRIu64 "\n",
cf_name_.c_str(), max_num_files_to_compact, max_num_runs,
max_run_size_);
}
return c;
}
Compaction* MaybePickDeleteTriggeredCompaction(
Compaction* const prev_picked_c) {
if (prev_picked_c != nullptr) {
return prev_picked_c;
}
Compaction* c = PickDeleteTriggeredCompaction();
if (c != nullptr) {
TEST_SYNC_POINT("PickDeleteTriggeredCompactionReturnNonnullptr");
ROCKS_LOG_BUFFER(
log_buffer_,
"[%s] Universal: picked for delete triggered compaction\n",
cf_name_.c_str());
}
return c;
}
Compaction* PickCompactionToReduceSortedRuns(
unsigned int ratio, unsigned int max_number_of_files_to_compact);
Compaction* PickCompactionToReduceSizeAmp();
Compaction* PickIncrementalForReduceSizeAmp(double fanout_threshold);
Compaction* PickDeleteTriggeredCompaction();
bool ShouldSkipMarkedFile(const FileMetaData* file) const;
Compaction* PickCompactionToOldest(size_t start_index,
CompactionReason compaction_reason);
Compaction* PickCompactionWithSortedRunRange(
size_t start_index, size_t end_index, CompactionReason compaction_reason);
Compaction* PickPeriodicCompaction();
bool ShouldSkipLastSortedRunForSizeAmpCompaction() const {
assert(!sorted_runs_.empty());
return mutable_cf_options_.preclude_last_level_data_seconds > 0 &&
ioptions_.num_levels > 2 &&
sorted_runs_.back().level == ioptions_.num_levels - 1 &&
sorted_runs_.size() > 1;
}
bool IsInputFilesNonOverlapping(Compaction* c);
uint64_t GetMaxOverlappingBytes() const;
std::size_t MightExcludeNewL0sToReduceWriteStop(
std::size_t num_l0_input_pre_exclusion, std::size_t end_index,
std::size_t& start_index, uint64_t& candidate_size) const {
if (num_l0_input_pre_exclusion == 0) {
return 0;
}
assert(start_index <= end_index && sorted_runs_.size() > end_index);
assert(mutable_cf_options_.level0_stop_writes_trigger > 0);
const std::size_t level0_stop_writes_trigger = static_cast<std::size_t>(
mutable_cf_options_.level0_stop_writes_trigger);
const std::size_t max_merge_width = static_cast<std::size_t>(
mutable_cf_options_.compaction_options_universal.max_merge_width);
const std::size_t min_merge_width = static_cast<std::size_t>(
mutable_cf_options_.compaction_options_universal.min_merge_width);
const uint64_t max_size_amplification_percent =
mutable_cf_options_.compaction_options_universal
.max_size_amplification_percent;
const uint64_t base_sr_size = sorted_runs_[end_index].size;
const std::size_t max_num_l0_to_exclude =
std::min(num_l0_input_pre_exclusion - 1, end_index - start_index - 1);
const std::size_t num_extra_l0_before_write_stop =
level0_stop_writes_trigger -
std::min(level0_stop_writes_trigger, end_index - start_index + 1);
const std::size_t num_l0_to_exclude_for_max_merge_width =
std::min(max_merge_width -
std::min(max_merge_width, num_extra_l0_before_write_stop),
max_num_l0_to_exclude);
const std::size_t num_l0_to_exclude_for_min_merge_width =
std::min(min_merge_width -
std::min(min_merge_width, num_extra_l0_before_write_stop),
max_num_l0_to_exclude);
std::size_t num_l0_to_exclude = 0;
uint64_t candidate_size_post_exclusion = candidate_size;
for (std::size_t possible_num_l0_to_exclude =
num_l0_to_exclude_for_min_merge_width;
possible_num_l0_to_exclude <= num_l0_to_exclude_for_max_merge_width;
++possible_num_l0_to_exclude) {
uint64_t current_candidate_size = candidate_size_post_exclusion;
for (std::size_t j = num_l0_to_exclude; j < possible_num_l0_to_exclude;
++j) {
current_candidate_size -=
sorted_runs_.at(start_index + j).compensated_file_size;
}
if (current_candidate_size * 100 <
max_size_amplification_percent * base_sr_size ||
current_candidate_size < candidate_size * 9 / 10) {
break;
}
num_l0_to_exclude = possible_num_l0_to_exclude;
candidate_size_post_exclusion = current_candidate_size;
}
start_index += num_l0_to_exclude;
candidate_size = candidate_size_post_exclusion;
return num_l0_to_exclude;
}
bool MeetsOutputLevelRequirements(int output_level) const {
return !require_max_output_level_ ||
Compaction::OutputToNonZeroMaxOutputLevel(
output_level, vstorage_->MaxOutputLevel(allow_ingest_behind_));
}
const ImmutableOptions& ioptions_;
const InternalKeyComparator* icmp_;
double score_;
std::vector<SortedRun> sorted_runs_;
uint64_t max_run_size_;
const std::string& cf_name_;
const MutableCFOptions& mutable_cf_options_;
const MutableDBOptions& mutable_db_options_;
VersionStorageInfo* vstorage_;
UniversalCompactionPicker* picker_;
LogBuffer* log_buffer_;
std::optional<SequenceNumber> earliest_snapshot_;
const SnapshotChecker* snapshot_checker_;
std::map<uint64_t, size_t> file_marked_for_compaction_to_sorted_run_index_;
bool require_max_output_level_;
bool allow_ingest_behind_;
const std::string& full_history_ts_low_;
std::vector<UniversalCompactionBuilder::SortedRun> CalculateSortedRuns(
const VersionStorageInfo& vstorage, int last_level,
uint64_t* max_run_size);
static uint32_t GetPathId(const ImmutableCFOptions& ioptions,
const MutableCFOptions& mutable_cf_options,
uint64_t file_size);
};
struct InputFileInfo {
InputFileInfo() : InputFileInfo(nullptr, 0, 0) {}
InputFileInfo(FileMetaData* file_meta, size_t l, size_t i)
: f(file_meta), level(l), index(i) {}
FileMetaData* f;
size_t level;
size_t index;
};
struct SmallestKeyHeapComparator {
explicit SmallestKeyHeapComparator(const Comparator* ucmp) { ucmp_ = ucmp; }
bool operator()(InputFileInfo i1, InputFileInfo i2) const {
return (ucmp_->CompareWithoutTimestamp(i1.f->smallest.user_key(),
i2.f->smallest.user_key()) > 0);
}
private:
const Comparator* ucmp_;
};
using SmallestKeyHeap =
std::priority_queue<InputFileInfo, std::vector<InputFileInfo>,
SmallestKeyHeapComparator>;
SmallestKeyHeap create_level_heap(Compaction* c, const Comparator* ucmp) {
SmallestKeyHeap smallest_key_priority_q =
SmallestKeyHeap(SmallestKeyHeapComparator(ucmp));
for (size_t l = 0; l < c->num_input_levels(); l++) {
if (c->num_input_files(l) != 0) {
if (l == 0 && c->start_level() == 0) {
for (size_t i = 0; i < c->num_input_files(0); i++) {
smallest_key_priority_q.emplace(c->input(0, i), 0, i);
}
} else {
smallest_key_priority_q.emplace(c->input(l, 0), l, 0);
}
}
}
return smallest_key_priority_q;
}
#ifndef NDEBUG
void GetSmallestLargestSeqno(const std::vector<FileMetaData*>& files,
SequenceNumber* smallest_seqno,
SequenceNumber* largest_seqno) {
bool is_first = true;
for (FileMetaData* f : files) {
assert(f->fd.smallest_seqno <= f->fd.largest_seqno);
if (is_first) {
is_first = false;
*smallest_seqno = f->fd.smallest_seqno;
*largest_seqno = f->fd.largest_seqno;
} else {
if (f->fd.smallest_seqno < *smallest_seqno) {
*smallest_seqno = f->fd.smallest_seqno;
}
if (f->fd.largest_seqno > *largest_seqno) {
*largest_seqno = f->fd.largest_seqno;
}
}
}
}
#endif
}
bool UniversalCompactionBuilder::IsInputFilesNonOverlapping(Compaction* c) {
auto comparator = icmp_->user_comparator();
int first_iter = 1;
InputFileInfo prev, curr;
SmallestKeyHeap smallest_key_priority_q =
create_level_heap(c, icmp_->user_comparator());
while (!smallest_key_priority_q.empty()) {
curr = smallest_key_priority_q.top();
smallest_key_priority_q.pop();
if (first_iter) {
prev = curr;
first_iter = 0;
} else {
if (comparator->CompareWithoutTimestamp(
prev.f->largest.user_key(), curr.f->smallest.user_key()) >= 0) {
return false;
}
assert(comparator->CompareWithoutTimestamp(
curr.f->largest.user_key(), prev.f->largest.user_key()) > 0);
prev = curr;
}
if (c->level(curr.level) != 0 &&
curr.index < c->num_input_files(curr.level) - 1) {
smallest_key_priority_q.emplace(c->input(curr.level, curr.index + 1),
curr.level, curr.index + 1);
}
}
return true;
}
bool UniversalCompactionPicker::NeedsCompaction(
const VersionStorageInfo* vstorage) const {
const int kLevel0 = 0;
if (vstorage->CompactionScore(kLevel0) >= 1) {
return true;
}
if (!vstorage->FilesMarkedForPeriodicCompaction().empty()) {
return true;
}
if (!vstorage->FilesMarkedForCompaction().empty()) {
return true;
}
return false;
}
Compaction* UniversalCompactionPicker::PickCompaction(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
const MutableDBOptions& mutable_db_options,
const std::vector<SequenceNumber>& existing_snapshots,
const SnapshotChecker* snapshot_checker, VersionStorageInfo* vstorage,
LogBuffer* log_buffer, const std::string& full_history_ts_low,
bool require_max_output_level) {
UniversalCompactionBuilder builder(
ioptions_, icmp_, cf_name, mutable_cf_options, mutable_db_options,
existing_snapshots, snapshot_checker, vstorage, this, log_buffer,
require_max_output_level, full_history_ts_low);
return builder.PickCompaction();
}
void UniversalCompactionBuilder::SortedRun::Dump(char* out_buf,
size_t out_buf_size,
bool print_path) const {
if (level == 0) {
assert(file != nullptr);
if (file->fd.GetPathId() == 0 || !print_path) {
snprintf(out_buf, out_buf_size, "file %" PRIu64, file->fd.GetNumber());
} else {
snprintf(out_buf, out_buf_size,
"file %" PRIu64
"(path "
"%" PRIu32 ")",
file->fd.GetNumber(), file->fd.GetPathId());
}
} else {
snprintf(out_buf, out_buf_size, "level %d", level);
}
}
void UniversalCompactionBuilder::SortedRun::DumpSizeInfo(
char* out_buf, size_t out_buf_size, size_t sorted_run_count) const {
if (level == 0) {
assert(file != nullptr);
snprintf(out_buf, out_buf_size,
"file %" PRIu64 "[%" ROCKSDB_PRIszt
"] "
"with size %" PRIu64 " (compensated size %" PRIu64 ")",
file->fd.GetNumber(), sorted_run_count, file->fd.GetFileSize(),
file->compensated_file_size);
} else {
snprintf(out_buf, out_buf_size,
"level %d[%" ROCKSDB_PRIszt
"] "
"with size %" PRIu64 " (compensated size %" PRIu64 ")",
level, sorted_run_count, size, compensated_file_size);
}
}
std::vector<UniversalCompactionBuilder::SortedRun>
UniversalCompactionBuilder::CalculateSortedRuns(
const VersionStorageInfo& vstorage, int last_level,
uint64_t* max_run_size) {
assert(max_run_size);
*max_run_size = 0;
std::vector<UniversalCompactionBuilder::SortedRun> ret;
for (FileMetaData* f : vstorage.LevelFiles(0)) {
if (earliest_snapshot_.has_value() && f->marked_for_compaction) {
file_marked_for_compaction_to_sorted_run_index_.emplace(f->fd.GetNumber(),
ret.size());
}
ret.emplace_back(
0, f, f->fd.GetFileSize(), f->compensated_file_size, f->being_compacted,
f->marked_for_compaction && f->FileIsStandAloneRangeTombstone());
*max_run_size = std::max(*max_run_size, f->fd.GetFileSize());
}
for (int level = 1; level <= last_level; level++) {
uint64_t total_compensated_size = 0U;
uint64_t total_size = 0U;
bool being_compacted = false;
bool level_has_marked_standalone_rangedel = false;
for (FileMetaData* f : vstorage.LevelFiles(level)) {
total_compensated_size += f->compensated_file_size;
total_size += f->fd.GetFileSize();
if (f->being_compacted) {
being_compacted = f->being_compacted;
}
level_has_marked_standalone_rangedel =
level_has_marked_standalone_rangedel ||
(f->marked_for_compaction && f->FileIsStandAloneRangeTombstone());
if (earliest_snapshot_.has_value() && f->marked_for_compaction) {
file_marked_for_compaction_to_sorted_run_index_.emplace(
f->fd.GetNumber(), ret.size());
}
}
if (total_compensated_size > 0) {
ret.emplace_back(level, nullptr, total_size, total_compensated_size,
being_compacted, level_has_marked_standalone_rangedel);
}
*max_run_size = std::max(*max_run_size, total_size);
}
return ret;
}
bool UniversalCompactionBuilder::ShouldSkipMarkedFile(
const FileMetaData* file) const {
assert(file->marked_for_compaction);
if (!earliest_snapshot_.has_value()) {
return false;
}
if (!file->FileIsStandAloneRangeTombstone()) {
return false;
}
if (!DataIsDefinitelyInSnapshot(file->fd.largest_seqno,
earliest_snapshot_.value(),
snapshot_checker_)) {
return true;
}
auto iter = file_marked_for_compaction_to_sorted_run_index_.find(
file->fd.GetNumber());
assert(iter != file_marked_for_compaction_to_sorted_run_index_.end());
size_t idx = iter->second;
const SortedRun* succeeding_sorted_run =
idx < sorted_runs_.size() - 1 ? &sorted_runs_[idx + 1] : nullptr;
if (succeeding_sorted_run &&
succeeding_sorted_run->level_has_marked_standalone_rangedel) {
return true;
}
return false;
}
Compaction* UniversalCompactionBuilder::PickCompaction() {
const int kLevel0 = 0;
score_ = vstorage_->CompactionScore(kLevel0);
const int max_output_level = vstorage_->MaxOutputLevel(allow_ingest_behind_);
const int file_num_compaction_trigger =
mutable_cf_options_.level0_file_num_compaction_trigger;
const unsigned int ratio =
mutable_cf_options_.compaction_options_universal.size_ratio;
if (max_output_level == 0 &&
!MeetsOutputLevelRequirements(0 )) {
return nullptr;
}
max_run_size_ = 0;
sorted_runs_ =
CalculateSortedRuns(*vstorage_, max_output_level, &max_run_size_);
if (sorted_runs_.size() == 0 ||
(vstorage_->FilesMarkedForPeriodicCompaction().empty() &&
vstorage_->FilesMarkedForCompaction().empty() &&
sorted_runs_.size() < (unsigned int)file_num_compaction_trigger)) {
ROCKS_LOG_BUFFER(log_buffer_, "[%s] Universal: nothing to do\n",
cf_name_.c_str());
TEST_SYNC_POINT_CALLBACK(
"UniversalCompactionBuilder::PickCompaction:Return", nullptr);
return nullptr;
}
VersionStorageInfo::LevelSummaryStorage tmp;
ROCKS_LOG_BUFFER_MAX_SZ(
log_buffer_, 3072,
"[%s] Universal: sorted runs: %" ROCKSDB_PRIszt " files: %s\n",
cf_name_.c_str(), sorted_runs_.size(), vstorage_->LevelSummary(&tmp));
Compaction* c = nullptr;
c = MaybePickPeriodicCompaction(c);
c = MaybePickSizeAmpCompaction(c, file_num_compaction_trigger);
c = MaybePickCompactionToReduceSortedRunsBasedFileRatio(
c, file_num_compaction_trigger, ratio);
c = MaybePickCompactionToReduceSortedRuns(c, file_num_compaction_trigger,
ratio);
c = MaybePickDeleteTriggeredCompaction(c);
if (c == nullptr) {
TEST_SYNC_POINT_CALLBACK(
"UniversalCompactionBuilder::PickCompaction:Return", nullptr);
return nullptr;
}
assert(c->output_level() <= vstorage_->MaxOutputLevel(allow_ingest_behind_));
assert(MeetsOutputLevelRequirements(c->output_level()));
if (mutable_cf_options_.compaction_options_universal.allow_trivial_move ==
true &&
c->compaction_reason() != CompactionReason::kPeriodicCompaction) {
c->set_is_trivial_move(IsInputFilesNonOverlapping(c));
}
#ifndef NDEBUG
bool is_first = true;
size_t level_index = 0U;
if (c->start_level() == 0) {
for (auto f : *c->inputs(0)) {
assert(f->fd.smallest_seqno <= f->fd.largest_seqno);
if (is_first) {
is_first = false;
}
}
level_index = 1U;
}
for (; level_index < c->num_input_levels(); level_index++) {
if (c->num_input_files(level_index) != 0) {
SequenceNumber smallest_seqno = 0U;
SequenceNumber largest_seqno = 0U;
GetSmallestLargestSeqno(*(c->inputs(level_index)), &smallest_seqno,
&largest_seqno);
if (is_first) {
is_first = false;
}
}
}
#endif
size_t num_files = 0;
for (auto& each_level : *c->inputs()) {
num_files += each_level.files.size();
}
RecordInHistogram(ioptions_.stats, NUM_FILES_IN_SINGLE_COMPACTION, num_files);
picker_->RegisterCompaction(c);
vstorage_->ComputeCompactionScore(ioptions_, mutable_cf_options_,
full_history_ts_low_);
TEST_SYNC_POINT_CALLBACK("UniversalCompactionBuilder::PickCompaction:Return",
c);
return c;
}
uint32_t UniversalCompactionBuilder::GetPathId(
const ImmutableCFOptions& ioptions,
const MutableCFOptions& mutable_cf_options, uint64_t file_size) {
uint64_t accumulated_size = 0;
uint64_t future_size =
file_size *
(100 - mutable_cf_options.compaction_options_universal.size_ratio) / 100;
uint32_t p = 0;
assert(!ioptions.cf_paths.empty());
for (; p < ioptions.cf_paths.size() - 1; p++) {
uint64_t target_size = ioptions.cf_paths[p].target_size;
if (target_size > file_size &&
accumulated_size + (target_size - file_size) > future_size) {
return p;
}
accumulated_size += target_size;
}
return p;
}
Compaction* UniversalCompactionBuilder::PickCompactionToReduceSortedRuns(
unsigned int ratio, unsigned int max_number_of_files_to_compact) {
unsigned int min_merge_width =
mutable_cf_options_.compaction_options_universal.min_merge_width;
unsigned int max_merge_width =
mutable_cf_options_.compaction_options_universal.max_merge_width;
const SortedRun* sr = nullptr;
bool done = false;
size_t start_index = 0;
unsigned int candidate_count = 0;
unsigned int max_files_to_compact =
std::min(max_merge_width, max_number_of_files_to_compact);
min_merge_width = std::max(min_merge_width, 2U);
assert(sorted_runs_.size() > 0);
for (size_t loop = 0; loop < sorted_runs_.size(); loop++) {
candidate_count = 0;
for (sr = nullptr; loop < sorted_runs_.size(); loop++) {
sr = &sorted_runs_[loop];
if (!sr->being_compacted && !sr->level_has_marked_standalone_rangedel) {
candidate_count = 1;
break;
}
char file_num_buf[kFormatFileNumberBufSize];
sr->Dump(file_num_buf, sizeof(file_num_buf));
if (sr->being_compacted) {
ROCKS_LOG_BUFFER(log_buffer_,
"[%s] Universal: %s"
"[%d] being compacted, skipping for compaction to "
"reduce sorted runs",
cf_name_.c_str(), file_num_buf, loop);
} else if (sr->level_has_marked_standalone_rangedel) {
ROCKS_LOG_BUFFER(
log_buffer_,
"[%s] Universal: %s"
"[%d] has standalone range tombstone files marked for "
"compaction, skipping for compaction to reduce sorted runs",
cf_name_.c_str(), file_num_buf, loop);
}
sr = nullptr;
}
uint64_t candidate_size = sr != nullptr ? sr->compensated_file_size : 0;
if (sr != nullptr) {
char file_num_buf[kFormatFileNumberBufSize];
sr->Dump(file_num_buf, sizeof(file_num_buf), true);
ROCKS_LOG_BUFFER(log_buffer_,
"[%s] Universal: Possible candidate for compaction to "
"reduce sorted runs %s[%d].",
cf_name_.c_str(), file_num_buf, loop);
}
for (size_t i = loop + 1;
candidate_count < max_files_to_compact && i < sorted_runs_.size();
i++) {
const SortedRun* succeeding_sr = &sorted_runs_[i];
if (succeeding_sr->being_compacted ||
succeeding_sr->level_has_marked_standalone_rangedel) {
break;
}
double sz = candidate_size * (100.0 + ratio) / 100.0;
if (sz < static_cast<double>(succeeding_sr->size)) {
break;
}
if (mutable_cf_options_.compaction_options_universal.stop_style ==
kCompactionStopStyleSimilarSize) {
sz = (succeeding_sr->size * (100.0 + ratio)) / 100.0;
if (sz < static_cast<double>(candidate_size)) {
break;
}
candidate_size = succeeding_sr->compensated_file_size;
} else { candidate_size += succeeding_sr->compensated_file_size;
}
candidate_count++;
}
if (candidate_count >= (unsigned int)min_merge_width) {
start_index = loop;
done = true;
break;
} else {
for (size_t i = loop;
i < loop + candidate_count && i < sorted_runs_.size(); i++) {
const SortedRun* skipping_sr = &sorted_runs_[i];
char file_num_buf[256];
skipping_sr->DumpSizeInfo(file_num_buf, sizeof(file_num_buf), loop);
ROCKS_LOG_BUFFER(log_buffer_, "[%s] Universal: Skipping %s",
cf_name_.c_str(), file_num_buf);
}
}
}
if (!done || candidate_count <= 1) {
return nullptr;
}
size_t first_index_after = start_index + candidate_count;
bool enable_compression = true;
int ratio_to_compress =
mutable_cf_options_.compaction_options_universal.compression_size_percent;
if (ratio_to_compress >= 0) {
uint64_t total_size = 0;
for (auto& sorted_run : sorted_runs_) {
total_size += sorted_run.compensated_file_size;
}
uint64_t older_file_size = 0;
for (size_t i = sorted_runs_.size() - 1; i >= first_index_after; i--) {
older_file_size += sorted_runs_[i].size;
if (older_file_size * 100L >= total_size * (long)ratio_to_compress) {
enable_compression = false;
break;
}
}
}
uint64_t estimated_total_size = 0;
for (unsigned int i = 0; i < first_index_after; i++) {
estimated_total_size += sorted_runs_[i].size;
}
uint32_t path_id =
GetPathId(ioptions_, mutable_cf_options_, estimated_total_size);
int start_level = sorted_runs_[start_index].level;
int output_level;
int max_output_level = vstorage_->MaxOutputLevel(allow_ingest_behind_);
if (first_index_after == sorted_runs_.size()) {
output_level = max_output_level;
} else if (sorted_runs_[first_index_after].level == 0) {
output_level = 0;
} else {
output_level = sorted_runs_[first_index_after].level - 1;
}
if (!MeetsOutputLevelRequirements(output_level)) {
return nullptr;
}
std::vector<CompactionInputFiles> inputs(max_output_level + 1);
for (size_t i = 0; i < inputs.size(); ++i) {
inputs[i].level = start_level + static_cast<int>(i);
}
for (size_t i = start_index; i < first_index_after; i++) {
auto& picking_sr = sorted_runs_[i];
if (picking_sr.level == 0) {
FileMetaData* picking_file = picking_sr.file;
inputs[0].files.push_back(picking_file);
} else {
auto& files = inputs[picking_sr.level - start_level].files;
for (auto* f : vstorage_->LevelFiles(picking_sr.level)) {
files.push_back(f);
}
}
char file_num_buf[256];
picking_sr.DumpSizeInfo(file_num_buf, sizeof(file_num_buf), i);
ROCKS_LOG_BUFFER(log_buffer_, "[%s] Universal: Picking %s",
cf_name_.c_str(), file_num_buf);
}
std::vector<FileMetaData*> grandparents;
if (mutable_cf_options_.compaction_options_universal.incremental &&
first_index_after < sorted_runs_.size() &&
sorted_runs_[first_index_after].level > 1) {
grandparents = vstorage_->LevelFiles(sorted_runs_[first_index_after].level);
}
if (output_level != 0 && picker_->FilesRangeOverlapWithCompaction(
inputs, output_level,
Compaction::EvaluateProximalLevel(
vstorage_, mutable_cf_options_, ioptions_,
start_level, output_level))) {
return nullptr;
}
CompactionReason compaction_reason;
if (max_number_of_files_to_compact == UINT_MAX) {
compaction_reason = CompactionReason::kUniversalSizeRatio;
} else {
compaction_reason = CompactionReason::kUniversalSortedRunNum;
}
return new Compaction(vstorage_, ioptions_, mutable_cf_options_,
mutable_db_options_, std::move(inputs), output_level,
MaxFileSizeForLevel(mutable_cf_options_, output_level,
kCompactionStyleUniversal),
GetMaxOverlappingBytes(), path_id,
GetCompressionType(vstorage_, mutable_cf_options_,
output_level, 1, enable_compression),
GetCompressionOptions(mutable_cf_options_, vstorage_,
output_level, enable_compression),
Temperature::kUnknown,
0, grandparents,
std::nullopt,
nullptr, compaction_reason,
"", score_,
true);
}
Compaction* UniversalCompactionBuilder::PickCompactionToReduceSizeAmp() {
assert(!sorted_runs_.empty());
const size_t end_index = ShouldSkipLastSortedRunForSizeAmpCompaction()
? sorted_runs_.size() - 2
: sorted_runs_.size() - 1;
if (sorted_runs_[end_index].being_compacted ||
sorted_runs_[end_index].level_has_marked_standalone_rangedel) {
return nullptr;
}
const uint64_t base_sr_size = sorted_runs_[end_index].size;
size_t start_index = end_index;
uint64_t candidate_size = 0;
size_t num_l0_files = 0;
while (start_index > 0) {
const SortedRun* sr = &sorted_runs_[start_index - 1];
if (sr->being_compacted || sr->level_has_marked_standalone_rangedel) {
char file_num_buf[kFormatFileNumberBufSize];
sr->Dump(file_num_buf, sizeof(file_num_buf), true);
if (sr->being_compacted) {
ROCKS_LOG_BUFFER(log_buffer_,
"[%s] Universal: stopping for size amp compaction at "
"sorted run undergoing compaction: "
"%s[%" ROCKSDB_PRIszt "]",
cf_name_.c_str(), file_num_buf, start_index - 1);
} else if (sr->level_has_marked_standalone_rangedel) {
ROCKS_LOG_BUFFER(log_buffer_,
"[%s] Universal: stopping for size amp compaction at "
"sorted run that has "
"standalone range "
"tombstone files marked for compaction: "
"%s[%" ROCKSDB_PRIszt "]",
cf_name_.c_str(), file_num_buf, start_index - 1);
}
break;
}
candidate_size += sr->compensated_file_size;
num_l0_files += sr->level == 0 ? 1 : 0;
--start_index;
}
if (start_index == end_index) {
return nullptr;
}
{
const size_t num_l0_to_exclude = MightExcludeNewL0sToReduceWriteStop(
num_l0_files, end_index, start_index, candidate_size);
ROCKS_LOG_BUFFER(
log_buffer_,
"[%s] Universal: Excluding for size amp compaction %" ROCKSDB_PRIszt
" latest L0 files to reduce potential write stop "
"triggered by `level0_stop_writes_trigger`",
cf_name_.c_str(), num_l0_to_exclude);
}
{
char file_num_buf[kFormatFileNumberBufSize];
sorted_runs_[start_index].Dump(file_num_buf, sizeof(file_num_buf), true);
ROCKS_LOG_BUFFER(
log_buffer_,
"[%s] Universal: First candidate %s[%" ROCKSDB_PRIszt "] %s",
cf_name_.c_str(), file_num_buf, start_index, " to reduce size amp.\n");
}
const uint64_t ratio = mutable_cf_options_.compaction_options_universal
.max_size_amplification_percent;
if (candidate_size * 100 < ratio * base_sr_size) {
ROCKS_LOG_BUFFER(log_buffer_,
"[%s] Universal: size amp compction not needed. "
"newer-files-total-size %" PRIu64
" earliest-file-size %" PRIu64,
cf_name_.c_str(), candidate_size, base_sr_size);
return nullptr;
} else {
ROCKS_LOG_BUFFER(log_buffer_,
"[%s] Universal: size amp compaction needed. "
"newer-files-total-size %" PRIu64
" earliest-file-size %" PRIu64,
cf_name_.c_str(), candidate_size, base_sr_size);
}
if (mutable_cf_options_.compaction_options_universal.incremental) {
double fanout_threshold = static_cast<double>(base_sr_size) /
static_cast<double>(candidate_size) * 1.8;
Compaction* picked = PickIncrementalForReduceSizeAmp(fanout_threshold);
if (picked != nullptr) {
return picked;
}
}
return PickCompactionWithSortedRunRange(
start_index, end_index, CompactionReason::kUniversalSizeAmplification);
}
Compaction* UniversalCompactionBuilder::PickIncrementalForReduceSizeAmp(
double fanout_threshold) {
assert(sorted_runs_.size() >= 2);
int second_last_level = sorted_runs_[sorted_runs_.size() - 2].level;
if (second_last_level == 0) {
return nullptr;
}
int output_level = sorted_runs_.back().level;
const std::vector<FileMetaData*>& bottom_files =
vstorage_->LevelFiles(output_level);
const std::vector<FileMetaData*>& files =
vstorage_->LevelFiles(second_last_level);
assert(!bottom_files.empty());
assert(!files.empty());
int picked_start_idx = 0;
int picked_end_idx = 0;
double picked_fanout = fanout_threshold;
uint64_t comp_thres_size = mutable_cf_options_.max_compaction_bytes / 2;
int start_idx = 0;
int bottom_end_idx = 0;
int bottom_start_idx = 0;
uint64_t non_bottom_size = 0;
uint64_t bottom_size = 0;
bool end_bottom_size_counted = false;
for (int end_idx = 0; end_idx < static_cast<int>(files.size()); end_idx++) {
FileMetaData* end_file = files[end_idx];
int num_skipped = 0;
while (bottom_end_idx < static_cast<int>(bottom_files.size()) &&
icmp_->Compare(bottom_files[bottom_end_idx]->largest,
end_file->smallest) < 0) {
if (!end_bottom_size_counted) {
bottom_size += bottom_files[bottom_end_idx]->fd.file_size;
}
bottom_end_idx++;
end_bottom_size_counted = false;
num_skipped++;
}
if (num_skipped > 1) {
start_idx = end_idx;
}
if (start_idx == end_idx) {
non_bottom_size = 0;
bottom_size = 0;
bottom_start_idx = bottom_end_idx;
end_bottom_size_counted = false;
}
non_bottom_size += end_file->fd.file_size;
while (bottom_end_idx < static_cast<int>(bottom_files.size()) &&
icmp_->Compare(bottom_files[bottom_end_idx]->smallest,
end_file->largest) < 0) {
if (!end_bottom_size_counted) {
bottom_size += bottom_files[bottom_end_idx]->fd.file_size;
end_bottom_size_counted = true;
}
if (icmp_->Compare(bottom_files[bottom_end_idx]->largest,
end_file->largest) > 0) {
break;
}
bottom_end_idx++;
end_bottom_size_counted = false;
}
if ((non_bottom_size + bottom_size > comp_thres_size ||
end_idx == static_cast<int>(files.size()) - 1) &&
non_bottom_size > 0) { double fanout = static_cast<double>(bottom_size) /
static_cast<double>(non_bottom_size);
if (fanout < picked_fanout) {
picked_start_idx = start_idx;
picked_end_idx = end_idx;
picked_fanout = fanout;
}
while (non_bottom_size + bottom_size > comp_thres_size &&
start_idx <= end_idx) {
non_bottom_size -= files[start_idx]->fd.file_size;
start_idx++;
if (start_idx < static_cast<int>(files.size())) {
while (bottom_start_idx <= bottom_end_idx &&
icmp_->Compare(bottom_files[bottom_start_idx]->largest,
files[start_idx]->smallest) < 0) {
bottom_size -= bottom_files[bottom_start_idx]->fd.file_size;
bottom_start_idx++;
}
}
}
}
}
if (picked_fanout >= fanout_threshold) {
assert(picked_fanout == fanout_threshold);
return nullptr;
}
std::vector<CompactionInputFiles> inputs;
CompactionInputFiles bottom_level_inputs;
CompactionInputFiles second_last_level_inputs;
second_last_level_inputs.level = second_last_level;
bottom_level_inputs.level = output_level;
for (int i = picked_start_idx; i <= picked_end_idx; i++) {
if (files[i]->being_compacted) {
return nullptr;
}
second_last_level_inputs.files.push_back(files[i]);
}
assert(!second_last_level_inputs.empty());
if (!picker_->ExpandInputsToCleanCut(cf_name_, vstorage_,
&second_last_level_inputs,
nullptr)) {
return nullptr;
}
int parent_index = -1; if (!picker_->SetupOtherInputs(cf_name_, mutable_cf_options_, vstorage_,
&second_last_level_inputs,
&bottom_level_inputs, &parent_index,
-1)) {
return nullptr;
}
InternalKey smallest, largest;
picker_->GetRange(second_last_level_inputs, &smallest, &largest);
std::vector<CompactionInputFiles> inputs_reverse;
for (auto it = ++(++sorted_runs_.rbegin()); it != sorted_runs_.rend(); it++) {
SortedRun& sr = *it;
if (sr.level == 0) {
break;
}
std::vector<FileMetaData*> level_inputs;
vstorage_->GetCleanInputsWithinInterval(sr.level, &smallest, &largest,
&level_inputs);
if (!level_inputs.empty()) {
inputs_reverse.push_back({});
inputs_reverse.back().level = sr.level;
inputs_reverse.back().files = level_inputs;
picker_->GetRange(inputs_reverse.back(), &smallest, &largest);
}
}
for (auto it = inputs_reverse.rbegin(); it != inputs_reverse.rend(); it++) {
inputs.push_back(*it);
}
inputs.push_back(second_last_level_inputs);
inputs.push_back(bottom_level_inputs);
int start_level = Compaction::kInvalidLevel;
for (const auto& in : inputs) {
if (!in.empty()) {
start_level = in.level;
break;
}
}
if (output_level != 0 && picker_->FilesRangeOverlapWithCompaction(
inputs, output_level,
Compaction::EvaluateProximalLevel(
vstorage_, mutable_cf_options_, ioptions_,
start_level, output_level))) {
return nullptr;
}
uint32_t path_id = 0;
return new Compaction(
vstorage_, ioptions_, mutable_cf_options_, mutable_db_options_,
std::move(inputs), output_level,
MaxFileSizeForLevel(mutable_cf_options_, output_level,
kCompactionStyleUniversal),
GetMaxOverlappingBytes(), path_id,
GetCompressionType(vstorage_, mutable_cf_options_, output_level, 1,
true ),
GetCompressionOptions(mutable_cf_options_, vstorage_, output_level,
true ),
Temperature::kUnknown,
0, {},
std::nullopt,
nullptr,
CompactionReason::kUniversalSizeAmplification,
"", score_,
true);
}
Compaction* UniversalCompactionBuilder::PickDeleteTriggeredCompaction() {
CompactionInputFiles start_level_inputs;
int output_level;
std::vector<CompactionInputFiles> inputs;
std::vector<FileMetaData*> grandparents;
if (vstorage_->num_levels() == 1) {
int start_index = -1;
start_level_inputs.level = 0;
start_level_inputs.files.clear();
output_level = 0;
for (size_t loop = 0; loop + 1 < sorted_runs_.size(); loop++) {
SortedRun* sr = &sorted_runs_[loop];
if (sr->being_compacted) {
continue;
}
FileMetaData* f = vstorage_->LevelFiles(0)[loop];
if (f->marked_for_compaction && !ShouldSkipMarkedFile(f)) {
start_level_inputs.files.push_back(f);
start_index =
static_cast<int>(loop); break;
}
}
if (start_index < 0) {
return nullptr;
}
for (size_t loop = start_index + 1; loop < sorted_runs_.size(); loop++) {
SortedRun* sr = &sorted_runs_[loop];
if (sr->being_compacted || sr->level_has_marked_standalone_rangedel) {
break;
}
FileMetaData* f = vstorage_->LevelFiles(0)[loop];
start_level_inputs.files.push_back(f);
}
if (start_level_inputs.size() <= 1) {
return nullptr;
}
inputs.push_back(start_level_inputs);
} else {
int start_level;
picker_->PickFilesMarkedForCompaction(cf_name_, vstorage_, &start_level,
&output_level, &start_level_inputs,
[this](const FileMetaData* file) {
return ShouldSkipMarkedFile(file);
});
if (start_level_inputs.empty()) {
return nullptr;
}
int max_output_level = vstorage_->MaxOutputLevel(allow_ingest_behind_);
for (output_level = start_level + 1; output_level <= max_output_level;
output_level++) {
if (vstorage_->NumLevelFiles(output_level) != 0) {
break;
}
}
if (output_level > max_output_level) {
if (start_level == 0) {
output_level = max_output_level;
} else {
return nullptr;
}
}
assert(output_level <= max_output_level);
if (!MeetsOutputLevelRequirements(output_level)) {
return nullptr;
}
if (output_level != 0) {
const FileMetaData* starting_l0_file =
(start_level == 0 && start_level_inputs.size() == 1 &&
start_level_inputs.files[0]->FileIsStandAloneRangeTombstone())
? start_level_inputs.files[0]
: nullptr;
if (start_level == 0) {
if (!picker_->GetOverlappingL0Files(vstorage_, &start_level_inputs,
output_level, nullptr,
starting_l0_file)) {
return nullptr;
}
}
CompactionInputFiles output_level_inputs;
int parent_index = -1;
output_level_inputs.level = output_level;
if (!picker_->SetupOtherInputs(cf_name_, mutable_cf_options_, vstorage_,
&start_level_inputs, &output_level_inputs,
&parent_index, -1, false,
starting_l0_file)) {
return nullptr;
}
inputs.push_back(start_level_inputs);
if (!output_level_inputs.empty()) {
inputs.push_back(output_level_inputs);
}
if (picker_->FilesRangeOverlapWithCompaction(
inputs, output_level,
Compaction::EvaluateProximalLevel(vstorage_, mutable_cf_options_,
ioptions_, start_level,
output_level))) {
return nullptr;
}
picker_->GetGrandparents(vstorage_, start_level_inputs,
output_level_inputs, &grandparents);
} else {
inputs.push_back(start_level_inputs);
}
}
uint64_t estimated_total_size = 0;
for (FileMetaData* f : vstorage_->LevelFiles(output_level)) {
estimated_total_size += f->fd.GetFileSize();
}
uint32_t path_id =
GetPathId(ioptions_, mutable_cf_options_, estimated_total_size);
return new Compaction(
vstorage_, ioptions_, mutable_cf_options_, mutable_db_options_,
std::move(inputs), output_level,
MaxFileSizeForLevel(mutable_cf_options_, output_level,
kCompactionStyleUniversal),
GetMaxOverlappingBytes(), path_id,
GetCompressionType(vstorage_, mutable_cf_options_, output_level, 1),
GetCompressionOptions(mutable_cf_options_, vstorage_, output_level),
Temperature::kUnknown,
0, grandparents, earliest_snapshot_,
snapshot_checker_, CompactionReason::kFilesMarkedForCompaction,
"", score_,
true);
}
Compaction* UniversalCompactionBuilder::PickCompactionToOldest(
size_t start_index, CompactionReason compaction_reason) {
return PickCompactionWithSortedRunRange(start_index, sorted_runs_.size() - 1,
compaction_reason);
}
Compaction* UniversalCompactionBuilder::PickCompactionWithSortedRunRange(
size_t start_index, size_t end_index, CompactionReason compaction_reason) {
assert(start_index < sorted_runs_.size());
uint64_t estimated_total_size = 0;
for (size_t loop = start_index; loop <= end_index; loop++) {
estimated_total_size += sorted_runs_[loop].size;
}
uint32_t path_id =
GetPathId(ioptions_, mutable_cf_options_, estimated_total_size);
int start_level = sorted_runs_[start_index].level;
int max_output_level = vstorage_->MaxOutputLevel(allow_ingest_behind_);
std::vector<CompactionInputFiles> inputs(max_output_level + 1);
for (size_t i = 0; i < inputs.size(); ++i) {
inputs[i].level = start_level + static_cast<int>(i);
}
for (size_t loop = start_index; loop <= end_index; loop++) {
auto& picking_sr = sorted_runs_[loop];
if (picking_sr.level == 0) {
FileMetaData* f = picking_sr.file;
inputs[0].files.push_back(f);
} else {
auto& files = inputs[picking_sr.level - start_level].files;
for (auto* f : vstorage_->LevelFiles(picking_sr.level)) {
files.push_back(f);
}
}
std::string comp_reason_print_string;
if (compaction_reason == CompactionReason::kPeriodicCompaction) {
comp_reason_print_string = "periodic compaction";
} else if (compaction_reason ==
CompactionReason::kUniversalSizeAmplification) {
comp_reason_print_string = "size amp";
} else {
assert(false);
comp_reason_print_string = "unknown: ";
comp_reason_print_string.append(
std::to_string(static_cast<int>(compaction_reason)));
}
char file_num_buf[256];
picking_sr.DumpSizeInfo(file_num_buf, sizeof(file_num_buf), loop);
ROCKS_LOG_BUFFER(log_buffer_, "[%s] Universal: %s picking %s",
cf_name_.c_str(), comp_reason_print_string.c_str(),
file_num_buf);
}
int output_level;
if (end_index == sorted_runs_.size() - 1) {
output_level = max_output_level;
} else {
output_level = sorted_runs_[end_index + 1].level - 1;
}
if (!MeetsOutputLevelRequirements(output_level)) {
return nullptr;
}
if (output_level != 0 && picker_->FilesRangeOverlapWithCompaction(
inputs, output_level,
Compaction::EvaluateProximalLevel(
vstorage_, mutable_cf_options_, ioptions_,
start_level, output_level))) {
return nullptr;
}
return new Compaction(
vstorage_, ioptions_, mutable_cf_options_, mutable_db_options_,
std::move(inputs), output_level,
MaxFileSizeForLevel(mutable_cf_options_, output_level,
kCompactionStyleUniversal),
GetMaxOverlappingBytes(), path_id,
GetCompressionType(vstorage_, mutable_cf_options_, output_level, 1,
true ),
GetCompressionOptions(mutable_cf_options_, vstorage_, output_level,
true ),
Temperature::kUnknown,
0, {},
std::nullopt,
nullptr, compaction_reason,
"", score_,
true);
}
Compaction* UniversalCompactionBuilder::PickPeriodicCompaction() {
ROCKS_LOG_BUFFER(log_buffer_, "[%s] Universal: Periodic Compaction",
cf_name_.c_str());
size_t start_index = sorted_runs_.size();
while (start_index > 0 && !sorted_runs_[start_index - 1].being_compacted &&
!sorted_runs_[start_index - 1].level_has_marked_standalone_rangedel) {
start_index--;
}
if (start_index == sorted_runs_.size()) {
return nullptr;
}
if (start_index == sorted_runs_.size() - 1) {
bool included_file_marked = false;
int start_level = sorted_runs_[start_index].level;
FileMetaData* start_file = sorted_runs_[start_index].file;
for (const std::pair<int, FileMetaData*>& level_file_pair :
vstorage_->FilesMarkedForPeriodicCompaction()) {
if (start_level != 0) {
if (start_level == level_file_pair.first) {
included_file_marked = true;
break;
}
} else {
if (start_file == level_file_pair.second) {
included_file_marked = true;
break;
}
}
}
if (!included_file_marked) {
ROCKS_LOG_BUFFER(log_buffer_,
"[%s] Universal: Cannot form a compaction covering file "
"marked for periodic compaction",
cf_name_.c_str());
return nullptr;
}
}
Compaction* c = PickCompactionToOldest(start_index,
CompactionReason::kPeriodicCompaction);
TEST_SYNC_POINT_CALLBACK(
"UniversalCompactionPicker::PickPeriodicCompaction:Return", c);
return c;
}
uint64_t UniversalCompactionBuilder::GetMaxOverlappingBytes() const {
if (!mutable_cf_options_.compaction_options_universal.incremental) {
return std::numeric_limits<uint64_t>::max();
} else {
return mutable_cf_options_.target_file_size_base / 2 * 3;
}
}
}