#pragma once
#include <optional>
#include "db/blob/blob_file_addition.h"
#include "db/blob/blob_garbage_meter.h"
#include "db/compaction/compaction.h"
#include "db/compaction/compaction_iterator.h"
#include "db/compaction/compaction_outputs.h"
#include "db/internal_stats.h"
#include "db/output_validator.h"
#include "db/range_del_aggregator.h"
namespace ROCKSDB_NAMESPACE {
class SubcompactionState {
public:
const Compaction* compaction;
const std::optional<Slice> start, end;
Status status;
IOStatus io_status;
bool notify_on_subcompaction_completion = false;
CompactionJobStats compaction_job_stats;
const uint32_t sub_job_id;
Slice SmallestUserKey() const;
Slice LargestUserKey() const;
OutputIterator GetOutputs() const;
void AssignRangeDelAggregator(
std::unique_ptr<CompactionRangeDelAggregator>&& range_del_agg) {
assert(range_del_agg_ == nullptr);
assert(range_del_agg);
range_del_agg_ = std::move(range_del_agg);
}
void RemoveLastEmptyOutput() {
compaction_outputs_.RemoveLastEmptyOutput();
proximal_level_outputs_.RemoveLastEmptyOutput();
}
void CleanupOutputs() {
compaction_outputs_.Cleanup();
if (compaction->SupportsPerKeyPlacement()) {
proximal_level_outputs_.Cleanup();
}
}
void BuildSubcompactionJobInfo(
SubcompactionJobInfo& subcompaction_job_info) const {
const Compaction* c = compaction;
const ColumnFamilyData* cfd = c->column_family_data();
subcompaction_job_info.cf_id = cfd->GetID();
subcompaction_job_info.cf_name = cfd->GetName();
subcompaction_job_info.status = status;
subcompaction_job_info.subcompaction_job_id = static_cast<int>(sub_job_id);
subcompaction_job_info.base_input_level = c->start_level();
subcompaction_job_info.output_level = c->output_level();
subcompaction_job_info.compaction_reason = c->compaction_reason();
subcompaction_job_info.compression = c->output_compression();
subcompaction_job_info.stats = compaction_job_stats;
subcompaction_job_info.blob_compression_type =
c->mutable_cf_options().blob_compression_type;
}
SubcompactionState() = delete;
SubcompactionState(const SubcompactionState&) = delete;
SubcompactionState& operator=(const SubcompactionState&) = delete;
SubcompactionState(Compaction* c, const std::optional<Slice> _start,
const std::optional<Slice> _end, uint32_t _sub_job_id)
: compaction(c),
start(_start),
end(_end),
sub_job_id(_sub_job_id),
compaction_outputs_(c, false),
proximal_level_outputs_(c, true) {
assert(compaction != nullptr);
compaction_outputs_.SetOutputSlitKey(start, end);
}
SubcompactionState(SubcompactionState&& state) noexcept
: compaction(state.compaction),
start(state.start),
end(state.end),
status(std::move(state.status)),
io_status(std::move(state.io_status)),
notify_on_subcompaction_completion(
state.notify_on_subcompaction_completion),
compaction_job_stats(std::move(state.compaction_job_stats)),
sub_job_id(state.sub_job_id),
compaction_outputs_(std::move(state.compaction_outputs_)),
proximal_level_outputs_(std::move(state.proximal_level_outputs_)),
range_del_agg_(std::move(state.range_del_agg_)) {
current_outputs_ = state.current_outputs_ == &state.proximal_level_outputs_
? &proximal_level_outputs_
: &compaction_outputs_;
}
void AddOutputsEdit(VersionEdit* out_edit) const {
for (const auto& file : proximal_level_outputs_.outputs_) {
out_edit->AddFile(compaction->GetProximalLevel(), file.meta);
}
for (const auto& file : compaction_outputs_.outputs_) {
out_edit->AddFile(compaction->output_level(), file.meta);
}
}
void Cleanup(Cache* cache);
void AggregateCompactionOutputStats(
InternalStats::CompactionStatsFull& internal_stats) const;
CompactionOutputs& Current() const {
assert(current_outputs_);
return *current_outputs_;
}
CompactionOutputs* Outputs(bool is_proximal_level) {
assert(compaction);
if (is_proximal_level) {
assert(compaction->SupportsPerKeyPlacement());
return &proximal_level_outputs_;
}
return &compaction_outputs_;
}
InternalStats::CompactionStats* OutputStats(bool is_proximal_level) {
assert(compaction);
if (is_proximal_level) {
assert(compaction->SupportsPerKeyPlacement());
return &proximal_level_outputs_.stats_;
}
return &compaction_outputs_.stats_;
}
uint64_t GetWorkerCPUMicros() const {
uint64_t rv = compaction_outputs_.GetWorkerCPUMicros();
if (compaction->SupportsPerKeyPlacement()) {
rv += proximal_level_outputs_.GetWorkerCPUMicros();
}
return rv;
}
CompactionRangeDelAggregator* RangeDelAgg() const {
return range_del_agg_.get();
}
bool HasRangeDel() const {
return range_del_agg_ && !range_del_agg_->IsEmpty();
}
void SetSubcompactionProgress(
const SubcompactionProgress& subcompaction_progress) {
subcompaction_progress_ = subcompaction_progress;
}
SubcompactionProgress& GetSubcompactionProgressRef() {
return subcompaction_progress_;
}
Status AddToOutput(const CompactionIterator& iter, bool use_proximal_output,
const CompactionFileOpenFunc& open_file_func,
const CompactionFileCloseFunc& close_file_func,
const ParsedInternalKey& prev_iter_output_internal_key);
Status CloseCompactionFiles(const Status& curr_status,
const CompactionFileOpenFunc& open_file_func,
const CompactionFileCloseFunc& close_file_func) {
auto per_key = compaction->SupportsPerKeyPlacement();
Status s = curr_status;
if (per_key) {
s = proximal_level_outputs_.CloseOutput(s, range_del_agg_.get(),
open_file_func, close_file_func);
} else {
assert(proximal_level_outputs_.HasBuilder() == false);
assert(proximal_level_outputs_.HasOutput() == false);
}
s = compaction_outputs_.CloseOutput(s, range_del_agg_.get(), open_file_func,
close_file_func);
return s;
}
private:
CompactionOutputs compaction_outputs_;
CompactionOutputs proximal_level_outputs_;
CompactionOutputs* current_outputs_ = &compaction_outputs_;
std::unique_ptr<CompactionRangeDelAggregator> range_del_agg_;
SubcompactionProgress subcompaction_progress_;
};
}