#include <cinttypes>
#include <deque>
#include "db/builder.h"
#include "db/db_impl/db_impl.h"
#include "db/error_handler.h"
#include "db/event_helpers.h"
#include "file/file_util.h"
#include "file/sst_file_manager_impl.h"
#include "logging/logging.h"
#include "monitoring/iostats_context_imp.h"
#include "monitoring/perf_context_imp.h"
#include "monitoring/thread_status_updater.h"
#include "monitoring/thread_status_util.h"
#include "options/options_helper.h"
#include "rocksdb/file_system.h"
#include "rocksdb/io_status.h"
#include "rocksdb/options.h"
#include "rocksdb/table.h"
#include "test_util/sync_point.h"
#include "util/cast_util.h"
#include "util/coding.h"
#include "util/concurrent_task_limiter_impl.h"
#include "util/udt_util.h"
namespace ROCKSDB_NAMESPACE {
bool DBImpl::EnoughRoomForCompaction(
ColumnFamilyData* cfd, const std::vector<CompactionInputFiles>& inputs,
bool* sfm_reserved_compact_space, LogBuffer* log_buffer) {
bool enough_room = true;
auto sfm = static_cast<SstFileManagerImpl*>(
immutable_db_options_.sst_file_manager.get());
if (sfm) {
Status bg_error = error_handler_.GetBGError();
enough_room = sfm->EnoughRoomForCompaction(cfd, inputs, bg_error);
bg_error.PermitUncheckedError(); if (enough_room) {
*sfm_reserved_compact_space = true;
}
}
if (!enough_room) {
TEST_SYNC_POINT_CALLBACK(
"DBImpl::BackgroundCompaction():CancelledCompaction", &enough_room);
ROCKS_LOG_BUFFER(log_buffer,
"Cancelled compaction because not enough room");
RecordTick(stats_, COMPACTION_CANCELLED, 1);
}
return enough_room;
}
bool DBImpl::RequestCompactionToken(ColumnFamilyData* cfd, bool force,
std::unique_ptr<TaskLimiterToken>* token,
LogBuffer* log_buffer) {
assert(*token == nullptr);
auto limiter = static_cast<ConcurrentTaskLimiterImpl*>(
cfd->ioptions().compaction_thread_limiter.get());
if (limiter == nullptr) {
return true;
}
*token = limiter->GetToken(force);
if (*token != nullptr) {
ROCKS_LOG_BUFFER(log_buffer,
"Thread limiter [%s] increase [%s] compaction task, "
"force: %s, tasks after: %d",
limiter->GetName().c_str(), cfd->GetName().c_str(),
force ? "true" : "false", limiter->GetOutstandingTask());
return true;
}
return false;
}
bool DBImpl::ShouldRescheduleFlushRequestToRetainUDT(
const FlushRequest& flush_req) {
mutex_.AssertHeld();
assert(flush_req.cfd_to_max_mem_id_to_persist.size() == 1);
ColumnFamilyData* cfd = flush_req.cfd_to_max_mem_id_to_persist.begin()->first;
if (cfd->GetAndClearFlushSkipReschedule()) {
return false;
}
uint64_t max_memtable_id =
flush_req.cfd_to_max_mem_id_to_persist.begin()->second;
if (cfd->IsDropped() ||
!cfd->ShouldPostponeFlushToRetainUDT(max_memtable_id)) {
return false;
}
const auto& mutable_cf_options = cfd->GetLatestMutableCFOptions();
WriteStallCondition write_stall =
ColumnFamilyData::GetWriteStallConditionAndCause(
cfd->GetUnflushedMemTableCountForWriteStallCheck(),
0,
0, mutable_cf_options,
cfd->ioptions())
.first;
if (write_stall != WriteStallCondition::kNormal) {
return false;
}
return true;
}
IOStatus DBImpl::SyncClosedWals(const WriteOptions& write_options,
JobContext* job_context,
VersionEdit* synced_wals,
bool error_recovery_in_prog) {
TEST_SYNC_POINT("DBImpl::SyncClosedWals:Start");
IOStatus io_s = SyncWalImpl( false, write_options,
job_context, synced_wals, error_recovery_in_prog);
if (!io_s.ok()) {
TEST_SYNC_POINT("DBImpl::SyncClosedWals:Failed");
} else {
TEST_SYNC_POINT("DBImpl::SyncClosedWals:end");
}
return io_s;
}
Status DBImpl::FlushMemTableToOutputFile(
ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
bool* made_progress, JobContext* job_context, FlushReason flush_reason,
SuperVersionContext* superversion_context, LogBuffer* log_buffer,
Env::Priority thread_pri) {
mutex_.AssertHeld();
assert(cfd);
assert(cfd->imm());
assert(cfd->imm()->NumNotFlushed() != 0);
assert(cfd->imm()->IsFlushPending());
assert(versions_);
assert(versions_->GetColumnFamilySet());
const ReadOptions read_options(Env::IOActivity::kFlush);
const WriteOptions write_options(Env::IOActivity::kFlush);
const bool needs_to_sync_closed_wals =
cur_wal_number_ > 0 &&
(versions_->GetColumnFamilySet()->NumberOfColumnFamilies() > 1 ||
allow_2pc());
uint64_t max_memtable_id =
needs_to_sync_closed_wals
? cfd->imm()->GetLatestMemTableID(false )
: std::numeric_limits<uint64_t>::max();
FlushJob flush_job(
dbname_, cfd, immutable_db_options_, mutable_cf_options, max_memtable_id,
file_options_for_compaction_, versions_.get(), &mutex_, &shutting_down_,
job_context, flush_reason, log_buffer, directories_.GetDbDir(),
GetDataDir(cfd, 0U),
GetCompressionFlush(cfd->ioptions(), mutable_cf_options), stats_,
&event_logger_, mutable_cf_options.report_bg_io_stats,
true , true , thread_pri,
io_tracer_, cfd->GetSuperVersion()->ShareSeqnoToTimeMapping(), db_id_,
db_session_id_, cfd->GetFullHistoryTsLow(), &blob_callback_);
FileMetaData file_meta;
Status s;
bool need_cancel = false;
IOStatus log_io_s = IOStatus::OK();
if (needs_to_sync_closed_wals) {
VersionEdit synced_wals;
bool error_recovery_in_prog = error_handler_.IsRecoveryInProgress();
mutex_.Unlock();
log_io_s = SyncClosedWals(write_options, job_context, &synced_wals,
error_recovery_in_prog);
mutex_.Lock();
if (log_io_s.ok() && synced_wals.IsWalAddition()) {
log_io_s = status_to_io_status(
ApplyWALToManifest(read_options, write_options, &synced_wals));
TEST_SYNC_POINT_CALLBACK("DBImpl::FlushMemTableToOutputFile:CommitWal:1",
nullptr);
}
if (!log_io_s.ok() && !log_io_s.IsShutdownInProgress() &&
!log_io_s.IsColumnFamilyDropped()) {
error_handler_.SetBGError(log_io_s, BackgroundErrorReason::kFlush);
}
} else {
TEST_SYNC_POINT("DBImpl::SyncClosedWals:Skip");
}
s = log_io_s;
TEST_SYNC_POINT("DBImpl::FlushMemTableToOutputFile:BeforePickMemtables");
bool skip_set_bg_error = false;
if (s.ok() && !error_handler_.GetBGError().ok() &&
error_handler_.IsBGWorkStopped() &&
flush_reason != FlushReason::kErrorRecovery &&
flush_reason != FlushReason::kErrorRecoveryRetryFlush) {
skip_set_bg_error = true;
s = error_handler_.GetBGError();
assert(!s.ok());
ROCKS_LOG_BUFFER(log_buffer,
"[JOB %d] Skip flush due to background error %s",
job_context->job_id, s.ToString().c_str());
}
if (s.ok()) {
flush_job.PickMemTable();
need_cancel = true;
}
TEST_SYNC_POINT_CALLBACK(
"DBImpl::FlushMemTableToOutputFile:AfterPickMemtables", &flush_job);
NotifyOnFlushBegin(cfd, &file_meta, mutable_cf_options, job_context->job_id,
flush_reason);
bool switched_to_mempurge = false;
if (s.ok()) {
s = flush_job.Run(&logs_with_prep_tracker_, &file_meta,
&switched_to_mempurge, &skip_set_bg_error,
&error_handler_);
need_cancel = false;
}
if (!s.ok() && need_cancel) {
flush_job.Cancel();
}
if (s.ok()) {
InstallSuperVersionAndScheduleWork(cfd, superversion_context);
if (made_progress) {
*made_progress = true;
}
const std::string& column_family_name = cfd->GetName();
Version* const current = cfd->current();
assert(current);
const VersionStorageInfo* const storage_info = current->storage_info();
assert(storage_info);
VersionStorageInfo::LevelSummaryStorage tmp;
ROCKS_LOG_BUFFER(log_buffer, "[%s] Level summary: %s\n",
column_family_name.c_str(),
storage_info->LevelSummary(&tmp));
const auto& blob_files = storage_info->GetBlobFiles();
if (!blob_files.empty()) {
assert(blob_files.front());
assert(blob_files.back());
ROCKS_LOG_BUFFER(
log_buffer,
"[%s] Blob file summary: head=%" PRIu64 ", tail=%" PRIu64 "\n",
column_family_name.c_str(), blob_files.front()->GetBlobFileNumber(),
blob_files.back()->GetBlobFileNumber());
}
}
if (!s.ok() && !s.IsShutdownInProgress() && !s.IsColumnFamilyDropped() &&
!skip_set_bg_error) {
if (log_io_s.ok()) {
if (!versions_->io_status().ok()) {
error_handler_.SetBGError(s,
BackgroundErrorReason::kManifestWriteNoWAL);
} else {
error_handler_.SetBGError(s, BackgroundErrorReason::kFlushNoWAL);
}
} else {
assert(s == log_io_s);
Status new_bg_error = s;
error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
}
}
if (s.ok() && (!switched_to_mempurge)) {
NotifyOnFlushCompleted(cfd, mutable_cf_options,
flush_job.GetCommittedFlushJobsInfo());
auto sfm = static_cast<SstFileManagerImpl*>(
immutable_db_options_.sst_file_manager.get());
if (sfm) {
std::string file_path = MakeTableFileName(
cfd->ioptions().cf_paths[0].path, file_meta.fd.GetNumber());
sfm->OnAddFile(file_path).PermitUncheckedError();
if (sfm->IsMaxAllowedSpaceReached()) {
Status new_bg_error =
Status::SpaceLimit("Max allowed space was reached");
TEST_SYNC_POINT_CALLBACK(
"DBImpl::FlushMemTableToOutputFile:MaxAllowedSpaceReached",
&new_bg_error);
error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
}
}
}
TEST_SYNC_POINT("DBImpl::FlushMemTableToOutputFile:Finish");
return s;
}
Status DBImpl::FlushMemTablesToOutputFiles(
const autovector<BGFlushArg>& bg_flush_args, bool* made_progress,
JobContext* job_context, LogBuffer* log_buffer, Env::Priority thread_pri) {
if (immutable_db_options_.atomic_flush) {
return AtomicFlushMemTablesToOutputFiles(
bg_flush_args, made_progress, job_context, log_buffer, thread_pri);
}
assert(bg_flush_args.size() == 1);
InitSnapshotContext(job_context);
const auto& bg_flush_arg = bg_flush_args[0];
ColumnFamilyData* cfd = bg_flush_arg.cfd_;
const MutableCFOptions mutable_cf_options_copy =
cfd->GetLatestMutableCFOptions();
SuperVersionContext* superversion_context =
bg_flush_arg.superversion_context_;
FlushReason flush_reason = bg_flush_arg.flush_reason_;
Status s = FlushMemTableToOutputFile(
cfd, mutable_cf_options_copy, made_progress, job_context, flush_reason,
superversion_context, log_buffer, thread_pri);
return s;
}
Status DBImpl::AtomicFlushMemTablesToOutputFiles(
const autovector<BGFlushArg>& bg_flush_args, bool* made_progress,
JobContext* job_context, LogBuffer* log_buffer, Env::Priority thread_pri) {
mutex_.AssertHeld();
const ReadOptions read_options(Env::IOActivity::kFlush);
const WriteOptions write_options(Env::IOActivity::kFlush);
autovector<ColumnFamilyData*> cfds;
for (const auto& arg : bg_flush_args) {
cfds.emplace_back(arg.cfd_);
}
#ifndef NDEBUG
for (const auto cfd : cfds) {
assert(cfd->imm()->NumNotFlushed() != 0);
assert(cfd->imm()->IsFlushPending());
}
for (const auto& bg_flush_arg : bg_flush_args) {
assert(bg_flush_arg.flush_reason_ == bg_flush_args[0].flush_reason_);
}
#endif
InitSnapshotContext(job_context);
autovector<FSDirectory*> distinct_output_dirs;
autovector<std::string> distinct_output_dir_paths;
std::vector<std::unique_ptr<FlushJob>> jobs;
std::vector<MutableCFOptions> all_mutable_cf_options;
int num_cfs = static_cast<int>(cfds.size());
all_mutable_cf_options.reserve(num_cfs);
for (int i = 0; i < num_cfs; ++i) {
auto cfd = cfds[i];
FSDirectory* data_dir = GetDataDir(cfd, 0U);
const std::string& curr_path = cfd->ioptions().cf_paths[0].path;
bool found = false;
for (const auto& path : distinct_output_dir_paths) {
if (path == curr_path) {
found = true;
break;
}
}
if (!found) {
distinct_output_dir_paths.emplace_back(curr_path);
distinct_output_dirs.emplace_back(data_dir);
}
all_mutable_cf_options.emplace_back(cfd->GetLatestMutableCFOptions());
const MutableCFOptions& mutable_cf_options = all_mutable_cf_options.back();
uint64_t max_memtable_id = bg_flush_args[i].max_memtable_id_;
FlushReason flush_reason = bg_flush_args[i].flush_reason_;
jobs.emplace_back(new FlushJob(
dbname_, cfd, immutable_db_options_, mutable_cf_options,
max_memtable_id, file_options_for_compaction_, versions_.get(), &mutex_,
&shutting_down_, job_context, flush_reason, log_buffer,
directories_.GetDbDir(), data_dir,
GetCompressionFlush(cfd->ioptions(), mutable_cf_options), stats_,
&event_logger_, mutable_cf_options.report_bg_io_stats,
false , false ,
thread_pri, io_tracer_,
cfd->GetSuperVersion()->ShareSeqnoToTimeMapping(), db_id_,
db_session_id_, cfd->GetFullHistoryTsLow(), &blob_callback_));
}
std::vector<FileMetaData> file_meta(num_cfs);
std::deque<bool> switched_to_mempurge(num_cfs, false);
Status s;
IOStatus log_io_s = IOStatus::OK();
assert(num_cfs == static_cast<int>(jobs.size()));
for (int i = 0; i != num_cfs; ++i) {
const MutableCFOptions& mutable_cf_options = all_mutable_cf_options.at(i);
FlushReason flush_reason = bg_flush_args[i].flush_reason_;
NotifyOnFlushBegin(cfds[i], &file_meta[i], mutable_cf_options,
job_context->job_id, flush_reason);
}
if (cur_wal_number_ > 0) {
VersionEdit synced_wals;
bool error_recovery_in_prog = error_handler_.IsRecoveryInProgress();
mutex_.Unlock();
log_io_s = SyncClosedWals(write_options, job_context, &synced_wals,
error_recovery_in_prog);
mutex_.Lock();
if (log_io_s.ok() && synced_wals.IsWalAddition()) {
log_io_s = status_to_io_status(
ApplyWALToManifest(read_options, write_options, &synced_wals));
}
if (!log_io_s.ok() && !log_io_s.IsShutdownInProgress() &&
!log_io_s.IsColumnFamilyDropped()) {
if (wals_total_size_.LoadRelaxed() > 0) {
error_handler_.SetBGError(log_io_s, BackgroundErrorReason::kFlush);
} else {
error_handler_.SetBGError(log_io_s, BackgroundErrorReason::kFlushNoWAL);
}
}
}
s = log_io_s;
autovector<std::pair<bool, Status>> exec_status;
std::vector<bool> pick_status;
for (int i = 0; i != num_cfs; ++i) {
exec_status.emplace_back(false, Status::OK());
pick_status.push_back(false);
}
bool flush_for_recovery =
bg_flush_args[0].flush_reason_ == FlushReason::kErrorRecovery ||
bg_flush_args[0].flush_reason_ == FlushReason::kErrorRecoveryRetryFlush;
bool skip_set_bg_error = false;
if (s.ok() && !error_handler_.GetBGError().ok() &&
error_handler_.IsBGWorkStopped() && !flush_for_recovery) {
s = error_handler_.GetBGError();
skip_set_bg_error = true;
assert(!s.ok());
ROCKS_LOG_BUFFER(log_buffer,
"[JOB %d] Skip flush due to background error %s",
job_context->job_id, s.ToString().c_str());
}
if (s.ok()) {
for (int i = 0; i != num_cfs; ++i) {
jobs[i]->PickMemTable();
pick_status[i] = true;
}
}
if (s.ok()) {
assert(switched_to_mempurge.size() ==
static_cast<long unsigned int>(num_cfs));
for (int i = 1; i != num_cfs; ++i) {
exec_status[i].second =
jobs[i]->Run(&logs_with_prep_tracker_, &file_meta[i],
&(switched_to_mempurge.at(i)));
exec_status[i].first = true;
}
if (num_cfs > 1) {
TEST_SYNC_POINT(
"DBImpl::AtomicFlushMemTablesToOutputFiles:SomeFlushJobsComplete:1");
TEST_SYNC_POINT(
"DBImpl::AtomicFlushMemTablesToOutputFiles:SomeFlushJobsComplete:2");
}
assert(exec_status.size() > 0);
assert(!file_meta.empty());
exec_status[0].second = jobs[0]->Run(
&logs_with_prep_tracker_, file_meta.data() ,
switched_to_mempurge.empty() ? nullptr : &(switched_to_mempurge.at(0)));
exec_status[0].first = true;
Status error_status;
for (const auto& e : exec_status) {
if (!e.second.ok()) {
s = e.second;
if (!e.second.IsShutdownInProgress() &&
!e.second.IsColumnFamilyDropped()) {
error_status = e.second;
}
}
}
s = error_status.ok() ? s : error_status;
}
if (s.IsColumnFamilyDropped()) {
s = Status::OK();
}
if (s.ok() || s.IsShutdownInProgress()) {
for (auto dir : distinct_output_dirs) {
if (dir != nullptr) {
IOOptions io_options;
Status error_status =
WritableFileWriter::PrepareIOOptions(write_options, io_options);
if (error_status.ok()) {
error_status = dir->FsyncWithDirOptions(
io_options, nullptr,
DirFsyncOptions(DirFsyncOptions::FsyncReason::kNewFileSynced));
}
if (!error_status.ok()) {
s = error_status;
break;
}
}
}
} else if (!skip_set_bg_error) {
for (int i = 0; i != num_cfs; ++i) {
if (pick_status[i] && !exec_status[i].first) {
jobs[i]->Cancel();
}
}
for (int i = 0; i != num_cfs; ++i) {
if (exec_status[i].second.ok() && exec_status[i].first) {
auto& mems = jobs[i]->GetMemTables();
cfds[i]->imm()->RollbackMemtableFlush(
mems, false);
}
}
}
if (s.ok()) {
const auto wait_to_install_func =
[&]() -> std::pair<Status, bool > {
if (!versions_->io_status().ok()) {
return std::make_pair(versions_->io_status(), false);
} else if (shutting_down_.load(std::memory_order_acquire)) {
return std::make_pair(Status::ShutdownInProgress(), false);
}
bool ready = true;
for (size_t i = 0; i != cfds.size(); ++i) {
const auto& mems = jobs[i]->GetMemTables();
if (cfds[i]->IsDropped()) {
continue;
} else if (!mems.empty() &&
cfds[i]->imm()->GetEarliestMemTableID() < mems[0]->GetID()) {
ready = false;
break;
} else if (mems.empty() && cfds[i]->imm()->GetEarliestMemTableID() <=
bg_flush_args[i].max_memtable_id_) {
ready = false;
break;
}
}
return std::make_pair(Status::OK(), !ready);
};
bool resuming_from_bg_err =
error_handler_.IsDBStopped() || flush_for_recovery;
while ((!resuming_from_bg_err || error_handler_.GetRecoveryError().ok())) {
std::pair<Status, bool> res = wait_to_install_func();
TEST_SYNC_POINT_CALLBACK(
"DBImpl::AtomicFlushMemTablesToOutputFiles:WaitToCommit", &res);
if (!res.first.ok()) {
s = res.first;
break;
} else if (!res.second) {
break;
}
TEST_SYNC_POINT_CALLBACK(
"DBImpl::AtomicFlushMemTablesToOutputFiles:WaitCV", &res);
if (!flush_for_recovery && error_handler_.IsBGWorkStopped() &&
!error_handler_.GetBGError().ok()) {
s = error_handler_.GetBGError();
assert(!s.ok());
break;
}
atomic_flush_install_cv_.Wait();
resuming_from_bg_err = error_handler_.IsDBStopped() || flush_for_recovery;
}
if (!resuming_from_bg_err) {
if (s.ok()) {
s = error_handler_.GetBGError();
}
} else if (s.ok()) {
s = error_handler_.GetRecoveryError();
}
if (!s.ok()) {
for (int i = 0; i != num_cfs; ++i) {
assert(exec_status[i].first);
assert(exec_status[i].second.ok());
auto& mems = jobs[i]->GetMemTables();
cfds[i]->imm()->RollbackMemtableFlush(
mems, false);
}
}
}
if (s.ok()) {
autovector<ColumnFamilyData*> tmp_cfds;
autovector<const autovector<ReadOnlyMemTable*>*> mems_list;
autovector<FileMetaData*> tmp_file_meta;
autovector<std::list<std::unique_ptr<FlushJobInfo>>*>
committed_flush_jobs_info;
for (int i = 0; i != num_cfs; ++i) {
const auto& mems = jobs[i]->GetMemTables();
if (!cfds[i]->IsDropped() && !mems.empty()) {
tmp_cfds.emplace_back(cfds[i]);
mems_list.emplace_back(&mems);
tmp_file_meta.emplace_back(&file_meta[i]);
committed_flush_jobs_info.emplace_back(
jobs[i]->GetCommittedFlushJobsInfo());
}
}
s = InstallMemtableAtomicFlushResults(
nullptr , tmp_cfds, mems_list, versions_.get(),
&logs_with_prep_tracker_, &mutex_, tmp_file_meta,
committed_flush_jobs_info, &job_context->memtables_to_free,
directories_.GetDbDir(), log_buffer);
}
if (s.ok()) {
assert(num_cfs ==
static_cast<int>(job_context->superversion_contexts.size()));
for (int i = 0; i != num_cfs; ++i) {
assert(cfds[i]);
if (cfds[i]->IsDropped()) {
continue;
}
InstallSuperVersionAndScheduleWork(
cfds[i], &job_context->superversion_contexts[i]);
const std::string& column_family_name = cfds[i]->GetName();
Version* const current = cfds[i]->current();
assert(current);
const VersionStorageInfo* const storage_info = current->storage_info();
assert(storage_info);
VersionStorageInfo::LevelSummaryStorage tmp;
ROCKS_LOG_BUFFER(log_buffer, "[%s] Level summary: %s\n",
column_family_name.c_str(),
storage_info->LevelSummary(&tmp));
const auto& blob_files = storage_info->GetBlobFiles();
if (!blob_files.empty()) {
assert(blob_files.front());
assert(blob_files.back());
ROCKS_LOG_BUFFER(
log_buffer,
"[%s] Blob file summary: head=%" PRIu64 ", tail=%" PRIu64 "\n",
column_family_name.c_str(), blob_files.front()->GetBlobFileNumber(),
blob_files.back()->GetBlobFileNumber());
}
}
if (made_progress) {
*made_progress = true;
}
auto sfm = static_cast<SstFileManagerImpl*>(
immutable_db_options_.sst_file_manager.get());
assert(all_mutable_cf_options.size() == static_cast<size_t>(num_cfs));
for (int i = 0; s.ok() && i != num_cfs; ++i) {
if (switched_to_mempurge[i]) {
continue;
}
if (cfds[i]->IsDropped()) {
continue;
}
NotifyOnFlushCompleted(cfds[i], all_mutable_cf_options[i],
jobs[i]->GetCommittedFlushJobsInfo());
if (sfm) {
std::string file_path = MakeTableFileName(
cfds[i]->ioptions().cf_paths[0].path, file_meta[i].fd.GetNumber());
sfm->OnAddFile(file_path).PermitUncheckedError();
if (sfm->IsMaxAllowedSpaceReached() &&
error_handler_.GetBGError().ok()) {
Status new_bg_error =
Status::SpaceLimit("Max allowed space was reached");
error_handler_.SetBGError(new_bg_error,
BackgroundErrorReason::kFlush);
}
}
}
}
if (!s.ok() && !s.IsColumnFamilyDropped() && !skip_set_bg_error) {
if (log_io_s.ok()) {
if (!versions_->io_status().ok()) {
error_handler_.SetBGError(s,
BackgroundErrorReason::kManifestWriteNoWAL);
} else {
error_handler_.SetBGError(s, BackgroundErrorReason::kFlushNoWAL);
}
} else {
assert(s == log_io_s);
Status new_bg_error = s;
error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
}
}
return s;
}
void DBImpl::NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta,
const MutableCFOptions& mutable_cf_options,
int job_id, FlushReason flush_reason) {
if (immutable_db_options_.listeners.size() == 0U) {
return;
}
mutex_.AssertHeld();
if (shutting_down_.load(std::memory_order_acquire)) {
return;
}
bool triggered_writes_slowdown =
(cfd->current()->storage_info()->NumLevelFiles(0) >=
mutable_cf_options.level0_slowdown_writes_trigger);
bool triggered_writes_stop =
(cfd->current()->storage_info()->NumLevelFiles(0) >=
mutable_cf_options.level0_stop_writes_trigger);
mutex_.Unlock();
{
FlushJobInfo info{};
info.cf_id = cfd->GetID();
info.cf_name = cfd->GetName();
const uint64_t file_number = file_meta->fd.GetNumber();
info.file_path =
MakeTableFileName(cfd->ioptions().cf_paths[0].path, file_number);
info.file_number = file_number;
info.thread_id = env_->GetThreadID();
info.job_id = job_id;
info.triggered_writes_slowdown = triggered_writes_slowdown;
info.triggered_writes_stop = triggered_writes_stop;
info.smallest_seqno = file_meta->fd.smallest_seqno;
info.largest_seqno = file_meta->fd.largest_seqno;
info.flush_reason = flush_reason;
for (const auto& listener : immutable_db_options_.listeners) {
listener->OnFlushBegin(this, info);
}
}
mutex_.Lock();
}
void DBImpl::NotifyOnFlushCompleted(
ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
std::list<std::unique_ptr<FlushJobInfo>>* flush_jobs_info) {
assert(flush_jobs_info != nullptr);
if (immutable_db_options_.listeners.size() == 0U) {
return;
}
mutex_.AssertHeld();
if (shutting_down_.load(std::memory_order_acquire)) {
return;
}
bool triggered_writes_slowdown =
(cfd->current()->storage_info()->NumLevelFiles(0) >=
mutable_cf_options.level0_slowdown_writes_trigger);
bool triggered_writes_stop =
(cfd->current()->storage_info()->NumLevelFiles(0) >=
mutable_cf_options.level0_stop_writes_trigger);
mutex_.Unlock();
{
for (auto& info : *flush_jobs_info) {
info->triggered_writes_slowdown = triggered_writes_slowdown;
info->triggered_writes_stop = triggered_writes_stop;
for (const auto& listener : immutable_db_options_.listeners) {
listener->OnFlushCompleted(this, *info);
}
TEST_SYNC_POINT(
"DBImpl::NotifyOnFlushCompleted::PostAllOnFlushCompleted");
}
flush_jobs_info->clear();
}
mutex_.Lock();
}
Status DBImpl::CompactRange(const CompactRangeOptions& options,
ColumnFamilyHandle* column_family,
const Slice* begin_without_ts,
const Slice* end_without_ts) {
if (manual_compaction_paused_.load(std::memory_order_acquire) > 0) {
return Status::Incomplete(Status::SubCode::kManualCompactionPaused);
}
if (compaction_aborted_.load(std::memory_order_acquire) > 0) {
return Status::Incomplete(Status::SubCode::kCompactionAborted);
}
if (options.canceled && options.canceled->load(std::memory_order_acquire)) {
return Status::Incomplete(Status::SubCode::kManualCompactionPaused);
}
const Comparator* const ucmp = column_family->GetComparator();
assert(ucmp);
size_t ts_sz = ucmp->timestamp_size();
if (ts_sz == 0) {
return CompactRangeInternal(options, column_family, begin_without_ts,
end_without_ts, "" );
}
std::string begin_str, end_str;
auto [begin, end] =
MaybeAddTimestampsToRange(OptSlice::CopyFromPtr(begin_without_ts),
OptSlice::CopyFromPtr(end_without_ts), ts_sz,
&begin_str, &end_str, false );
return CompactRangeInternal(
options, column_family, begin.has_value() ? &begin.value() : nullptr,
end.has_value() ? &end.value() : nullptr, "" );
}
Status DBImpl::IncreaseFullHistoryTsLow(ColumnFamilyHandle* column_family,
std::string ts_low) {
ColumnFamilyData* cfd = nullptr;
if (column_family == nullptr) {
cfd = default_cf_handle_->cfd();
} else {
auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
assert(cfh != nullptr);
cfd = cfh->cfd();
}
assert(cfd != nullptr && cfd->user_comparator() != nullptr);
if (cfd->user_comparator()->timestamp_size() == 0) {
return Status::InvalidArgument(
"Timestamp is not enabled in this column family");
}
if (cfd->user_comparator()->timestamp_size() != ts_low.size()) {
return Status::InvalidArgument("ts_low size mismatch");
}
return IncreaseFullHistoryTsLowImpl(cfd, ts_low);
}
Status DBImpl::IncreaseFullHistoryTsLowImpl(ColumnFamilyData* cfd,
std::string ts_low) {
VersionEdit edit;
edit.SetColumnFamily(cfd->GetID());
edit.SetFullHistoryTsLow(ts_low);
const ReadOptions read_options;
const WriteOptions write_options;
TEST_SYNC_POINT_CALLBACK("DBImpl::IncreaseFullHistoryTsLowImpl:BeforeEdit",
&edit);
InstrumentedMutexLock l(&mutex_);
std::string current_ts_low = cfd->GetFullHistoryTsLow();
const Comparator* ucmp = cfd->user_comparator();
assert(ucmp->timestamp_size() == ts_low.size() && !ts_low.empty());
if (!current_ts_low.empty() &&
ucmp->CompareTimestamp(ts_low, current_ts_low) < 0) {
std::stringstream oss;
oss << "Current full_history_ts_low: "
<< ucmp->TimestampToString(current_ts_low)
<< " is higher than provided ts: " << ucmp->TimestampToString(ts_low)
<< std::endl;
return Status::InvalidArgument(oss.str());
}
Status s = versions_->LogAndApply(cfd, read_options, write_options, &edit,
&mutex_, directories_.GetDbDir());
if (!s.ok()) {
return s;
}
current_ts_low = cfd->GetFullHistoryTsLow();
if (!current_ts_low.empty() &&
ucmp->CompareTimestamp(current_ts_low, ts_low) > 0) {
std::stringstream oss;
oss << "full_history_ts_low: " << Slice(current_ts_low).ToString(true)
<< " is set to be higher than the requested "
"timestamp: "
<< Slice(ts_low).ToString(true) << std::endl;
return Status::TryAgain(oss.str());
}
return Status::OK();
}
Status DBImpl::CompactRangeInternal(const CompactRangeOptions& options,
ColumnFamilyHandle* column_family,
const Slice* begin, const Slice* end,
const std::string& trim_ts) {
auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
auto cfd = cfh->cfd();
if (options.target_path_id >= cfd->ioptions().cf_paths.size()) {
return Status::InvalidArgument("Invalid target path ID");
}
if (options.change_level &&
cfd->ioptions().compaction_style == kCompactionStyleFIFO) {
return Status::NotSupported(
"FIFO compaction does not support change_level.");
}
bool flush_needed = true;
if (options.full_history_ts_low != nullptr &&
!options.full_history_ts_low->empty()) {
std::string ts_low = options.full_history_ts_low->ToString();
if (begin != nullptr || end != nullptr) {
return Status::InvalidArgument(
"Cannot specify compaction range with full_history_ts_low");
}
Status s = IncreaseFullHistoryTsLowImpl(cfd, ts_low);
if (!s.ok()) {
LogFlush(immutable_db_options_.info_log);
return s;
}
}
Status s;
if (begin != nullptr && end != nullptr) {
UserKeyRange range(*begin, *end);
SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
s = cfd->RangesOverlapWithMemtables(
{range}, super_version, immutable_db_options_.allow_data_in_errors,
&flush_needed);
CleanupSuperVersion(super_version);
}
if (s.ok() && flush_needed) {
FlushOptions fo;
fo.allow_write_stall = options.allow_write_stall;
if (immutable_db_options_.atomic_flush) {
s = AtomicFlushMemTables(fo, FlushReason::kManualCompaction);
} else {
s = FlushMemTable(cfd, fo, FlushReason::kManualCompaction);
}
if (!s.ok()) {
LogFlush(immutable_db_options_.info_log);
return s;
}
}
constexpr int kInvalidLevel = -1;
int final_output_level = kInvalidLevel;
bool exclusive = options.exclusive_manual_compaction;
if (cfd->ioptions().compaction_style == kCompactionStyleUniversal &&
cfd->NumberLevels() > 1) {
final_output_level = cfd->NumberLevels() - 1;
if (cfd->AllowIngestBehind()) {
final_output_level--;
}
s = RunManualCompaction(cfd, ColumnFamilyData::kCompactAllLevels,
final_output_level, options, begin, end, exclusive,
false ,
std::numeric_limits<uint64_t>::max(), trim_ts);
} else if (cfd->ioptions().compaction_style == kCompactionStyleFIFO) {
final_output_level = 0;
s = RunManualCompaction(cfd, 0, final_output_level, options,
begin, end, exclusive,
false ,
std::numeric_limits<uint64_t>::max(), trim_ts);
} else {
int first_overlapped_level = kInvalidLevel;
{
SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
Version* current_version = super_version->current;
SstPartitionerFactory* partitioner_factory =
current_version->cfd()->ioptions().sst_partitioner_factory.get();
std::unique_ptr<SstPartitioner> partitioner;
if (partitioner_factory && begin != nullptr && end != nullptr) {
SstPartitioner::Context context;
context.is_full_compaction = false;
context.is_manual_compaction = true;
context.output_level = -1;
context.smallest_user_key = *begin;
context.largest_user_key = *end;
partitioner = partitioner_factory->CreatePartitioner(context);
}
ReadOptions ro;
ro.total_order_seek = true;
ro.io_activity = Env::IOActivity::kCompaction;
bool overlap;
for (int level = 0;
level < current_version->storage_info()->num_non_empty_levels();
level++) {
overlap = true;
bool check_overlap_within_file = false;
if (begin != nullptr && end != nullptr) {
check_overlap_within_file = true;
if (partitioner) {
if (!partitioner->CanDoTrivialMove(*begin, *end)) {
check_overlap_within_file = false;
}
}
}
if (check_overlap_within_file) {
Status status = current_version->OverlapWithLevelIterator(
ro, file_options_, *begin, *end, level, &overlap);
if (!status.ok()) {
check_overlap_within_file = false;
}
}
if (!check_overlap_within_file) {
overlap = current_version->storage_info()->OverlapInLevel(level,
begin, end);
}
if (overlap) {
first_overlapped_level = level;
break;
}
}
CleanupSuperVersion(super_version);
}
if (s.ok() && first_overlapped_level != kInvalidLevel) {
if (cfd->ioptions().compaction_style == kCompactionStyleUniversal) {
assert(first_overlapped_level == 0);
s = RunManualCompaction(
cfd, first_overlapped_level, first_overlapped_level, options, begin,
end, exclusive, true ,
std::numeric_limits<uint64_t>::max() ,
trim_ts);
final_output_level = first_overlapped_level;
} else {
assert(cfd->ioptions().compaction_style == kCompactionStyleLevel);
uint64_t next_file_number = versions_->current_next_file_number();
int level = first_overlapped_level;
final_output_level = level;
int output_level = 0, base_level = 0;
for (;;) {
if (level > 0) {
if (cfd->ioptions().level_compaction_dynamic_level_bytes) {
assert(final_output_level < cfd->ioptions().num_levels);
if (final_output_level + 1 == cfd->ioptions().num_levels) {
break;
}
} else {
InstrumentedMutexLock l(&mutex_);
if (final_output_level + 1 >=
cfd->current()->storage_info()->num_non_empty_levels()) {
break;
}
}
}
output_level = level + 1;
if (cfd->ioptions().level_compaction_dynamic_level_bytes &&
level == 0) {
output_level = ColumnFamilyData::kCompactToBaseLevel;
}
s = RunManualCompaction(
cfd, level, output_level, options, begin, end, exclusive,
!trim_ts.empty() ,
std::numeric_limits<uint64_t>::max() ,
trim_ts,
output_level == ColumnFamilyData::kCompactToBaseLevel
? &base_level
: nullptr);
if (!s.ok()) {
break;
}
if (output_level == ColumnFamilyData::kCompactToBaseLevel) {
assert(base_level > 0);
level = base_level;
} else {
++level;
}
final_output_level = level;
TEST_SYNC_POINT("DBImpl::RunManualCompaction()::1");
TEST_SYNC_POINT("DBImpl::RunManualCompaction()::2");
}
if (s.ok()) {
assert(final_output_level > 0);
if ((options.bottommost_level_compaction ==
BottommostLevelCompaction::kIfHaveCompactionFilter &&
(cfd->ioptions().compaction_filter != nullptr ||
cfd->ioptions().compaction_filter_factory != nullptr)) ||
options.bottommost_level_compaction ==
BottommostLevelCompaction::kForceOptimized ||
options.bottommost_level_compaction ==
BottommostLevelCompaction::kForce) {
s = RunManualCompaction(
cfd, final_output_level, final_output_level, options, begin,
end, exclusive, true ,
next_file_number , trim_ts);
}
}
}
}
}
if (!s.ok() || final_output_level == kInvalidLevel) {
LogFlush(immutable_db_options_.info_log);
return s;
}
if (options.change_level) {
TEST_SYNC_POINT("DBImpl::CompactRange:BeforeRefit:1");
TEST_SYNC_POINT("DBImpl::CompactRange:BeforeRefit:2");
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"[RefitLevel] waiting for background threads to stop");
DisableManualCompaction();
s = PauseBackgroundWork();
if (s.ok()) {
TEST_SYNC_POINT("DBImpl::CompactRange:PreRefitLevel");
s = ReFitLevel(cfd, final_output_level, options.target_level);
TEST_SYNC_POINT("DBImpl::CompactRange:PostRefitLevel");
Status temp_s = ContinueBackgroundWork();
assert(temp_s.ok());
}
EnableManualCompaction();
TEST_SYNC_POINT(
"DBImpl::CompactRange:PostRefitLevel:ManualCompactionEnabled");
}
LogFlush(immutable_db_options_.info_log);
{
InstrumentedMutexLock l(&mutex_);
MaybeScheduleFlushOrCompaction();
}
return s;
}
Status DBImpl::CompactFiles(const CompactionOptions& compact_options,
ColumnFamilyHandle* column_family,
const std::vector<std::string>& input_file_names,
const int output_level, const int output_path_id,
std::vector<std::string>* const output_file_names,
CompactionJobInfo* compaction_job_info) {
if (column_family == nullptr) {
return Status::InvalidArgument("ColumnFamilyHandle must be non-null.");
}
auto cfd =
static_cast_with_check<ColumnFamilyHandleImpl>(column_family)->cfd();
assert(cfd);
Status s;
JobContext job_context(next_job_id_.fetch_add(1), true);
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL,
immutable_db_options_.info_log.get());
if (compact_options.compression !=
CompressionType::kDisableCompressionOption) {
ROCKS_LOG_WARN(immutable_db_options_.info_log,
"[%s] [JOB %d] Found use of deprecated option "
"`CompactionOptions::compression`",
cfd->GetName().c_str(), job_context.job_id);
}
TEST_SYNC_POINT("TestCompactFiles::IngestExternalFile2");
TEST_SYNC_POINT_CALLBACK("TestCompactFiles:PausingManualCompaction:3",
static_cast<void*>(const_cast<std::atomic<int>*>(
&manual_compaction_paused_)));
TEST_SYNC_POINT_CALLBACK("TestCancelCompactFiles:SuccessfulCompaction",
static_cast<void*>(const_cast<std::atomic<int>*>(
&manual_compaction_paused_)));
{
InstrumentedMutexLock l(&mutex_);
auto* current = cfd->current();
current->Ref();
s = CompactFilesImpl(compact_options, cfd, current, input_file_names,
output_file_names, output_level, output_path_id,
&job_context, &log_buffer, compaction_job_info);
current->Unref();
}
{
InstrumentedMutexLock l(&mutex_);
FindObsoleteFiles(&job_context, !s.ok());
}
if (job_context.HaveSomethingToClean() ||
job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) {
log_buffer.FlushBufferToLog();
if (job_context.HaveSomethingToDelete()) {
PurgeObsoleteFiles(job_context);
}
job_context.Clean();
}
return s;
}
Status DBImpl::PerformTrivialMove(Compaction& c, LogBuffer* log_buffer,
bool& compaction_released,
size_t& moved_files, size_t& moved_bytes) {
mutex_.AssertHeld();
ROCKS_LOG_BUFFER(log_buffer, "[%s] Moving %d files to level-%d\n",
c.column_family_data()->GetName().c_str(),
static_cast<int>(c.num_input_files(0)), c.output_level());
for (unsigned int l = 0; l < c.num_input_levels(); l++) {
if (c.level(l) == c.output_level()) {
continue;
}
for (size_t i = 0; i < c.num_input_files(l); i++) {
FileMetaData* f = c.input(l, i);
c.edit()->DeleteFile(c.level(l), f->fd.GetNumber());
c.edit()->AddFile(c.output_level(), f->fd.GetNumber(), f->fd.GetPathId(),
f->fd.GetFileSize(), f->smallest, f->largest,
f->fd.smallest_seqno, f->fd.largest_seqno,
f->marked_for_compaction, f->temperature,
f->oldest_blob_file_number, f->oldest_ancester_time,
f->file_creation_time, f->epoch_number,
f->file_checksum, f->file_checksum_func_name,
f->unique_id, f->compensated_range_deletion_size,
f->tail_size, f->user_defined_timestamps_persisted,
f->min_timestamp, f->max_timestamp);
moved_bytes += static_cast<size_t>(c.input(l, i)->fd.GetFileSize());
ROCKS_LOG_BUFFER(
log_buffer, "[%s] Moved #%" PRIu64 " to level-%d %" PRIu64 " bytes\n",
c.column_family_data()->GetName().c_str(), f->fd.GetNumber(),
c.output_level(), f->fd.GetFileSize());
}
moved_files += c.num_input_files(l);
}
const ReadOptions read_options(Env::IOActivity::kCompaction);
const WriteOptions write_options(Env::IOActivity::kCompaction);
Status status = versions_->LogAndApply(
c.column_family_data(), read_options, write_options, c.edit(), &mutex_,
directories_.GetDbDir(), false,
nullptr,
[&c, &compaction_released](const Status& s) {
c.ReleaseCompactionFiles(s);
compaction_released = true;
});
return status;
}
Status DBImpl::CompactFilesImpl(
const CompactionOptions& compact_options, ColumnFamilyData* cfd,
Version* version, const std::vector<std::string>& input_file_names,
std::vector<std::string>* const output_file_names, const int output_level,
int output_path_id, JobContext* job_context, LogBuffer* log_buffer,
CompactionJobInfo* compaction_job_info) {
mutex_.AssertHeld();
if (shutting_down_.load(std::memory_order_acquire)) {
return Status::ShutdownInProgress();
}
if (compaction_aborted_.load(std::memory_order_acquire) > 0) {
return Status::Incomplete(Status::SubCode::kCompactionAborted);
}
if (manual_compaction_paused_.load(std::memory_order_acquire) > 0 ||
(compact_options.canceled &&
compact_options.canceled->load(std::memory_order_acquire))) {
return Status::Incomplete(Status::SubCode::kManualCompactionPaused);
}
std::unordered_set<uint64_t> input_set;
for (const auto& file_name : input_file_names) {
input_set.insert(TableFileNameToNumber(file_name));
}
if (output_path_id < 0) {
if (cfd->ioptions().cf_paths.size() == 1U) {
output_path_id = 0;
} else {
return Status::NotSupported(
"Automatic output path selection is not "
"yet supported in CompactFiles()");
}
}
if (cfd->AllowIngestBehind() &&
output_level >= cfd->ioptions().num_levels - 1) {
return Status::InvalidArgument(
"Exceed the maximum output level defined by "
"the current compaction algorithm with ingest_behind --- " +
std::to_string(cfd->ioptions().num_levels - 1));
}
std::vector<CompactionInputFiles> input_files;
Status s = cfd->compaction_picker()->SanitizeAndConvertCompactionInputFiles(
&input_set, output_level, version, &input_files);
TEST_SYNC_POINT(
"DBImpl::CompactFilesImpl::PostSanitizeAndConvertCompactionInputFiles");
if (!s.ok()) {
return s;
}
for (const auto& inputs : input_files) {
if (cfd->compaction_picker()->AreFilesInCompaction(inputs.files)) {
return Status::Aborted(
"Some of the necessary compaction input "
"files are already being compacted");
}
}
bool sfm_reserved_compact_space = false;
bool enough_room = EnoughRoomForCompaction(
cfd, input_files, &sfm_reserved_compact_space, log_buffer);
if (!enough_room) {
return Status::CompactionTooLarge();
}
bg_compaction_scheduled_++;
std::unique_ptr<Compaction> c;
assert(cfd->compaction_picker());
c.reset(cfd->compaction_picker()->PickCompactionForCompactFiles(
compact_options, input_files, output_level, version->storage_info(),
cfd->GetLatestMutableCFOptions(), mutable_db_options_, output_path_id));
assert(c != nullptr);
c->FinalizeInputInfo(version);
assert(!c->deletion_compaction());
bool is_trivial_move = compact_options.allow_trivial_move &&
c->IsTrivialMove() &&
immutable_db_options().compaction_service == nullptr;
if (is_trivial_move) {
TEST_SYNC_POINT("DBImpl::CompactFilesImpl:TrivialMove");
bool compaction_released = false;
size_t moved_files = 0;
size_t moved_bytes = 0;
Status status = PerformTrivialMove(
*c.get(), log_buffer, compaction_released, moved_files, moved_bytes);
if (status.ok()) {
InstallSuperVersionAndScheduleWork(
c->column_family_data(), job_context->superversion_contexts.data());
if (output_file_names != nullptr) {
for (const auto& newf : c->edit()->GetNewFiles()) {
output_file_names->push_back(TableFileName(
c->immutable_options().cf_paths, newf.second.fd.GetNumber(),
newf.second.fd.GetPathId()));
}
}
ROCKS_LOG_BUFFER(
log_buffer,
"[%s] Trivial move succeeded for %zu files, %zu bytes total\n",
c->column_family_data()->GetName().c_str(), moved_files, moved_bytes);
} else {
if (!compaction_released) {
c->ReleaseCompactionFiles(status);
}
ROCKS_LOG_BUFFER(log_buffer, "[%s] Trivial move failed: %s\n",
c->column_family_data()->GetName().c_str(),
status.ToString().c_str());
error_handler_.SetBGError(status, BackgroundErrorReason::kCompaction);
}
c.reset();
bg_compaction_scheduled_--;
if (bg_compaction_scheduled_ == 0) {
bg_cv_.SignalAll();
}
MaybeScheduleFlushOrCompaction();
return status;
}
InitSnapshotContext(job_context);
std::unique_ptr<std::list<uint64_t>::iterator> pending_outputs_inserted_elem(
new std::list<uint64_t>::iterator(
CaptureCurrentFileNumberInPendingOutputs()));
assert(is_snapshot_supported_ || snapshots_.empty());
CompactionJobStats compaction_job_stats;
CompactionJob compaction_job(
job_context->job_id, c.get(), immutable_db_options_, mutable_db_options_,
file_options_for_compaction_, versions_.get(), &shutting_down_,
log_buffer, directories_.GetDbDir(),
GetDataDir(c->column_family_data(), c->output_path_id()),
GetDataDir(c->column_family_data(), 0), stats_, &mutex_, &error_handler_,
job_context, table_cache_, &event_logger_,
c->mutable_cf_options().paranoid_file_checks,
c->mutable_cf_options().report_bg_io_stats, dbname_,
&compaction_job_stats, Env::Priority::USER, io_tracer_,
kManualCompactionCanceledFalse_, compaction_aborted_, db_id_,
db_session_id_, c->column_family_data()->GetFullHistoryTsLow(),
c->trim_ts(), &blob_callback_, &bg_compaction_scheduled_,
&bg_bottom_compaction_scheduled_);
version->storage_info()->ComputeCompactionScore(
cfd->ioptions(), c->mutable_cf_options(), cfd->GetFullHistoryTsLow());
compaction_job.Prepare(std::nullopt );
std::unique_ptr<std::list<uint64_t>::iterator> min_options_file_number_elem;
if (immutable_db_options().compaction_service != nullptr) {
min_options_file_number_elem.reset(
new std::list<uint64_t>::iterator(CaptureOptionsFileNumber()));
}
mutex_.Unlock();
TEST_SYNC_POINT("CompactFilesImpl:0");
TEST_SYNC_POINT("CompactFilesImpl:1");
compaction_job.Run().PermitUncheckedError();
TEST_SYNC_POINT("CompactFilesImpl:2");
TEST_SYNC_POINT("CompactFilesImpl:3");
mutex_.Lock();
if (immutable_db_options().compaction_service != nullptr) {
ReleaseOptionsFileNumber(min_options_file_number_elem);
}
bool compaction_released = false;
Status status = compaction_job.Install(&compaction_released);
if (!compaction_released) {
c->ReleaseCompactionFiles(s);
}
if (status.ok()) {
assert(compaction_job.io_status().ok());
InstallSuperVersionAndScheduleWork(
c->column_family_data(), job_context->superversion_contexts.data());
}
compaction_job.io_status().PermitUncheckedError();
auto sfm = static_cast<SstFileManagerImpl*>(
immutable_db_options_.sst_file_manager.get());
if (sfm && sfm_reserved_compact_space) {
sfm->OnCompactionCompletion(c.get());
}
ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
mutex_.Unlock();
if (compaction_job_info != nullptr) {
BuildCompactionJobInfo(cfd, c.get(), s, compaction_job_stats,
job_context->job_id, compaction_job_info);
}
mutex_.Lock();
if (status.ok()) {
} else if (status.IsColumnFamilyDropped() || status.IsShutdownInProgress()) {
} else if (status.IsManualCompactionPaused()) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"[%s] [JOB %d] Stopping manual compaction",
c->column_family_data()->GetName().c_str(),
job_context->job_id);
} else if (status.IsCompactionAborted()) {
ROCKS_LOG_INFO(
immutable_db_options_.info_log, "[%s] [JOB %d] Compaction aborted",
c->column_family_data()->GetName().c_str(), job_context->job_id);
} else {
ROCKS_LOG_WARN(immutable_db_options_.info_log,
"[%s] [JOB %d] Compaction error: %s",
c->column_family_data()->GetName().c_str(),
job_context->job_id, status.ToString().c_str());
IOStatus io_s = compaction_job.io_status();
if (!io_s.ok()) {
error_handler_.SetBGError(io_s, BackgroundErrorReason::kCompaction);
} else {
error_handler_.SetBGError(status, BackgroundErrorReason::kCompaction);
}
}
if (output_file_names != nullptr) {
for (const auto& newf : c->edit()->GetNewFiles()) {
output_file_names->push_back(TableFileName(
c->immutable_options().cf_paths, newf.second.fd.GetNumber(),
newf.second.fd.GetPathId()));
}
for (const auto& blob_file : c->edit()->GetBlobFileAdditions()) {
output_file_names->push_back(
BlobFileName(c->immutable_options().cf_paths.front().path,
blob_file.GetBlobFileNumber()));
}
}
c.reset();
bg_compaction_scheduled_--;
if (bg_compaction_scheduled_ == 0) {
bg_cv_.SignalAll();
}
MaybeScheduleFlushOrCompaction();
TEST_SYNC_POINT("CompactFilesImpl:End");
return status;
}
Status DBImpl::PauseBackgroundWork() {
InstrumentedMutexLock guard_lock(&mutex_);
bg_compaction_paused_++;
while (bg_bottom_compaction_scheduled_ > 0 || bg_compaction_scheduled_ > 0 ||
bg_flush_scheduled_ > 0) {
bg_cv_.Wait();
}
bg_work_paused_++;
return Status::OK();
}
Status DBImpl::ContinueBackgroundWork() {
InstrumentedMutexLock guard_lock(&mutex_);
if (bg_work_paused_ == 0) {
return Status::InvalidArgument("Background work already unpaused");
}
assert(bg_work_paused_ > 0);
assert(bg_compaction_paused_ > 0);
bg_compaction_paused_--;
bg_work_paused_--;
if (bg_work_paused_ == 0) {
MaybeScheduleFlushOrCompaction();
}
return Status::OK();
}
void DBImpl::NotifyOnCompactionBegin(ColumnFamilyData* cfd, Compaction* c,
const Status& st,
const CompactionJobStats& job_stats,
int job_id) {
if (immutable_db_options_.listeners.empty()) {
return;
}
mutex_.AssertHeld();
if (shutting_down_.load(std::memory_order_acquire)) {
return;
}
if (c->is_manual_compaction() &&
manual_compaction_paused_.load(std::memory_order_acquire) > 0) {
return;
}
c->SetNotifyOnCompactionCompleted();
int num_l0_files = c->input_version()->storage_info()->NumLevelFiles(0);
mutex_.Unlock();
TEST_SYNC_POINT("DBImpl::NotifyOnCompactionBegin::UnlockMutex");
{
CompactionJobInfo info{};
info.num_l0_files = num_l0_files;
BuildCompactionJobInfo(cfd, c, st, job_stats, job_id, &info);
for (const auto& listener : immutable_db_options_.listeners) {
listener->OnCompactionBegin(this, info);
}
info.status.PermitUncheckedError();
}
mutex_.Lock();
}
void DBImpl::NotifyOnCompactionCompleted(
ColumnFamilyData* cfd, Compaction* c, const Status& st,
const CompactionJobStats& compaction_job_stats, const int job_id) {
if (immutable_db_options_.listeners.size() == 0U) {
return;
}
mutex_.AssertHeld();
if (shutting_down_.load(std::memory_order_acquire)) {
return;
}
if (c->ShouldNotifyOnCompactionCompleted() == false) {
return;
}
int num_l0_files = cfd->current()->storage_info()->NumLevelFiles(0);
mutex_.Unlock();
TEST_SYNC_POINT("DBImpl::NotifyOnCompactionCompleted::UnlockMutex");
{
CompactionJobInfo info{};
info.num_l0_files = num_l0_files;
BuildCompactionJobInfo(cfd, c, st, compaction_job_stats, job_id, &info);
for (const auto& listener : immutable_db_options_.listeners) {
listener->OnCompactionCompleted(this, info);
}
}
mutex_.Lock();
}
Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) {
assert(level < cfd->NumberLevels());
if (target_level >= cfd->NumberLevels()) {
return Status::InvalidArgument("Target level exceeds number of levels");
}
const ReadOptions read_options(Env::IOActivity::kCompaction);
const WriteOptions write_options(Env::IOActivity::kCompaction);
SuperVersionContext sv_context( true);
InstrumentedMutexLock guard_lock(&mutex_);
auto* vstorage = cfd->current()->storage_info();
if (vstorage->LevelFiles(level).empty()) {
return Status::OK();
}
if (refitting_level_) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"[ReFitLevel] another thread is refitting");
return Status::NotSupported("another thread is refitting");
}
refitting_level_ = true;
const auto& mutable_cf_options = cfd->GetLatestMutableCFOptions();
int to_level = target_level;
if (target_level < 0) {
to_level = FindMinimumEmptyLevelFitting(cfd, mutable_cf_options, level);
}
if (to_level != level) {
std::vector<CompactionInputFiles> input(1);
input[0].level = level;
for (auto& f : vstorage->LevelFiles(level)) {
input[0].files.push_back(f);
}
InternalKey refit_level_smallest;
InternalKey refit_level_largest;
cfd->compaction_picker()->GetRange(input[0], &refit_level_smallest,
&refit_level_largest);
if (to_level > level) {
if (level == 0) {
refitting_level_ = false;
return Status::NotSupported(
"Cannot change from level 0 to other levels.");
}
for (int l = level + 1; l <= to_level; l++) {
if (vstorage->NumLevelFiles(l) > 0) {
refitting_level_ = false;
return Status::NotSupported(
"Levels between source and target are not empty for a move.");
}
if (cfd->RangeOverlapWithCompaction(refit_level_smallest.user_key(),
refit_level_largest.user_key(),
l)) {
refitting_level_ = false;
return Status::NotSupported(
"Levels between source and target "
"will have some ongoing compaction's output.");
}
}
} else {
if (to_level == 0 && input[0].files.size() > 1) {
refitting_level_ = false;
return Status::Aborted(
"Moving more than 1 file from non-L0 to L0 is not allowed as it "
"does not bring any benefit to read nor write throughput.");
}
for (int l = to_level; l < level; l++) {
if (vstorage->NumLevelFiles(l) > 0) {
refitting_level_ = false;
return Status::NotSupported(
"Levels between source and target are not empty for a move.");
}
if (cfd->RangeOverlapWithCompaction(refit_level_smallest.user_key(),
refit_level_largest.user_key(),
l)) {
refitting_level_ = false;
return Status::NotSupported(
"Levels between source and target "
"will have some ongoing compaction's output.");
}
}
}
ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
"[%s] Before refitting:\n%s", cfd->GetName().c_str(),
cfd->current()->DebugString().data());
std::unique_ptr<Compaction> c(new Compaction(
vstorage, cfd->ioptions(), mutable_cf_options, mutable_db_options_,
{input}, to_level,
MaxFileSizeForLevel(
mutable_cf_options, to_level,
cfd->ioptions()
.compaction_style)
,
LLONG_MAX ,
0 , mutable_cf_options.compression,
mutable_cf_options.compression_opts, Temperature::kUnknown,
0 ,
{} ,
std::nullopt , nullptr ,
CompactionReason::kRefitLevel, "" ,
-1 ,
false ));
cfd->compaction_picker()->RegisterCompaction(c.get());
TEST_SYNC_POINT("DBImpl::ReFitLevel:PostRegisterCompaction");
VersionEdit edit;
edit.SetColumnFamily(cfd->GetID());
for (const auto& f : vstorage->LevelFiles(level)) {
edit.DeleteFile(level, f->fd.GetNumber());
edit.AddFile(
to_level, f->fd.GetNumber(), f->fd.GetPathId(), f->fd.GetFileSize(),
f->smallest, f->largest, f->fd.smallest_seqno, f->fd.largest_seqno,
f->marked_for_compaction, f->temperature, f->oldest_blob_file_number,
f->oldest_ancester_time, f->file_creation_time, f->epoch_number,
f->file_checksum, f->file_checksum_func_name, f->unique_id,
f->compensated_range_deletion_size, f->tail_size,
f->user_defined_timestamps_persisted, f->min_timestamp,
f->max_timestamp);
}
ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
"[%s] Apply version edit:\n%s", cfd->GetName().c_str(),
edit.DebugString().data());
Status status =
versions_->LogAndApply(cfd, read_options, write_options, &edit, &mutex_,
directories_.GetDbDir());
c->MarkFilesBeingCompacted(false);
cfd->compaction_picker()->UnregisterCompaction(c.get());
c.reset();
InstallSuperVersionAndScheduleWork(cfd, &sv_context);
ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "[%s] LogAndApply: %s\n",
cfd->GetName().c_str(), status.ToString().data());
if (status.ok()) {
ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
"[%s] After refitting:\n%s", cfd->GetName().c_str(),
cfd->current()->DebugString().data());
}
sv_context.Clean();
refitting_level_ = false;
return status;
}
refitting_level_ = false;
return Status::OK();
}
int DBImpl::NumberLevels(ColumnFamilyHandle* column_family) {
auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
return cfh->cfd()->NumberLevels();
}
int DBImpl::Level0StopWriteTrigger(ColumnFamilyHandle* column_family) {
auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
InstrumentedMutexLock l(&mutex_);
return cfh->cfd()
->GetSuperVersion()
->mutable_cf_options.level0_stop_writes_trigger;
}
Status DBImpl::FlushAllColumnFamilies(const FlushOptions& flush_options,
FlushReason flush_reason) {
mutex_.AssertHeld();
Status status;
if (immutable_db_options_.atomic_flush) {
mutex_.Unlock();
status = AtomicFlushMemTables(flush_options, flush_reason);
if (status.IsColumnFamilyDropped()) {
status = Status::OK();
}
mutex_.Lock();
} else {
for (auto cfd : versions_->GetRefedColumnFamilySet()) {
if (cfd->IsDropped()) {
continue;
}
mutex_.Unlock();
status = FlushMemTable(cfd, flush_options, flush_reason);
TEST_SYNC_POINT("DBImpl::FlushAllColumnFamilies:1");
TEST_SYNC_POINT("DBImpl::FlushAllColumnFamilies:2");
mutex_.Lock();
if (!status.ok() && !status.IsColumnFamilyDropped()) {
break;
} else if (status.IsColumnFamilyDropped()) {
status = Status::OK();
}
}
}
return status;
}
Status DBImpl::Flush(const FlushOptions& flush_options,
ColumnFamilyHandle* column_family) {
auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
ROCKS_LOG_INFO(immutable_db_options_.info_log, "[%s] Manual flush start.",
cfh->GetName().c_str());
Status s;
if (immutable_db_options_.atomic_flush) {
s = AtomicFlushMemTables(flush_options, FlushReason::kManualFlush,
{cfh->cfd()});
} else {
s = FlushMemTable(cfh->cfd(), flush_options, FlushReason::kManualFlush);
}
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"[%s] Manual flush finished, status: %s\n",
cfh->GetName().c_str(), s.ToString().c_str());
return s;
}
Status DBImpl::Flush(const FlushOptions& flush_options,
const std::vector<ColumnFamilyHandle*>& column_families) {
Status s;
if (!immutable_db_options_.atomic_flush) {
for (auto cfh : column_families) {
s = Flush(flush_options, cfh);
if (!s.ok()) {
break;
}
}
} else {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Manual atomic flush start.\n"
"=====Column families:=====");
for (auto cfh : column_families) {
auto cfhi = static_cast<ColumnFamilyHandleImpl*>(cfh);
ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s",
cfhi->GetName().c_str());
}
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"=====End of column families list=====");
autovector<ColumnFamilyData*> cfds;
std::for_each(column_families.begin(), column_families.end(),
[&cfds](ColumnFamilyHandle* elem) {
auto cfh = static_cast<ColumnFamilyHandleImpl*>(elem);
cfds.emplace_back(cfh->cfd());
});
s = AtomicFlushMemTables(flush_options, FlushReason::kManualFlush, cfds);
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Manual atomic flush finished, status: %s\n"
"=====Column families:=====",
s.ToString().c_str());
for (auto cfh : column_families) {
auto cfhi = static_cast<ColumnFamilyHandleImpl*>(cfh);
ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s",
cfhi->GetName().c_str());
}
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"=====End of column families list=====");
}
return s;
}
Status DBImpl::RunManualCompaction(
ColumnFamilyData* cfd, int input_level, int output_level,
const CompactRangeOptions& compact_range_options, const Slice* begin,
const Slice* end, bool exclusive, bool disallow_trivial_move,
uint64_t max_file_num_to_ignore, const std::string& trim_ts,
int* final_output_level) {
assert(input_level == ColumnFamilyData::kCompactAllLevels ||
input_level >= 0);
InternalKey begin_storage, end_storage;
CompactionArg* ca = nullptr;
bool scheduled = false;
bool unscheduled = false;
Env::Priority thread_pool_priority = Env::Priority::TOTAL;
bool manual_conflict = false;
ManualCompactionState manual(
cfd, input_level, output_level, compact_range_options.target_path_id,
exclusive, disallow_trivial_move, compact_range_options.canceled);
if (begin == nullptr ||
cfd->ioptions().compaction_style == kCompactionStyleUniversal ||
cfd->ioptions().compaction_style == kCompactionStyleFIFO) {
manual.begin = nullptr;
} else {
begin_storage.SetMinPossibleForUserKey(*begin);
manual.begin = &begin_storage;
}
if (end == nullptr ||
cfd->ioptions().compaction_style == kCompactionStyleUniversal ||
cfd->ioptions().compaction_style == kCompactionStyleFIFO) {
manual.end = nullptr;
} else {
end_storage.SetMaxPossibleForUserKey(*end);
manual.end = &end_storage;
}
TEST_SYNC_POINT("DBImpl::RunManualCompaction:0");
TEST_SYNC_POINT("DBImpl::RunManualCompaction:1");
InstrumentedMutexLock l(&mutex_);
if (manual_compaction_paused_ > 0) {
TEST_SYNC_POINT("DBImpl::RunManualCompaction:PausedAtStart");
manual.status =
Status::Incomplete(Status::SubCode::kManualCompactionPaused);
manual.done = true;
return manual.status;
}
if (compaction_aborted_.load(std::memory_order_acquire) > 0) {
int counter = compaction_aborted_.load(std::memory_order_acquire);
ROCKS_LOG_INFO(
immutable_db_options_.info_log,
"RunManualCompaction: Aborting due to compaction_aborted_=%d", counter);
manual.status = Status::Incomplete(Status::SubCode::kCompactionAborted);
manual.done = true;
return manual.status;
}
AddManualCompaction(&manual);
TEST_SYNC_POINT_CALLBACK("DBImpl::RunManualCompaction:NotScheduled", &mutex_);
if (exclusive) {
while (bg_bottom_compaction_scheduled_ > 0 ||
bg_compaction_scheduled_ > 0) {
if (compaction_aborted_.load(std::memory_order_acquire) > 0) {
manual.done = true;
manual.status = Status::Incomplete(Status::SubCode::kCompactionAborted);
break;
}
if (manual_compaction_paused_ > 0 || manual.canceled == true) {
manual.done = true;
manual.status =
Status::Incomplete(Status::SubCode::kManualCompactionPaused);
break;
}
TEST_SYNC_POINT("DBImpl::RunManualCompaction:WaitScheduled");
ROCKS_LOG_INFO(
immutable_db_options_.info_log,
"[%s] Manual compaction waiting for all other scheduled background "
"compactions to finish",
cfd->GetName().c_str());
bg_cv_.Wait();
}
}
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL,
immutable_db_options_.info_log.get());
ROCKS_LOG_BUFFER(&log_buffer, "[%s] Manual compaction starting",
cfd->GetName().c_str());
while (!manual.done) {
assert(HasPendingManualCompaction());
manual_conflict = false;
Compaction* compaction = nullptr;
if (ShouldntRunManualCompaction(&manual) || (manual.in_progress == true) ||
scheduled ||
(((manual.manual_end = &manual.tmp_storage1) != nullptr) &&
((compaction = manual.cfd->CompactRange(
manual.cfd->GetLatestMutableCFOptions(), mutable_db_options_,
manual.input_level, manual.output_level, compact_range_options,
manual.begin, manual.end, &manual.manual_end, &manual_conflict,
max_file_num_to_ignore, trim_ts)) == nullptr &&
manual_conflict))) {
if (!manual.done) {
bg_cv_.Wait();
}
if (manual_compaction_paused_ > 0 && scheduled && !unscheduled) {
assert(thread_pool_priority != Env::Priority::TOTAL);
auto unscheduled_task_num = env_->UnSchedule(
GetTaskTag(TaskType::kManualCompaction), thread_pool_priority);
if (unscheduled_task_num > 0) {
ROCKS_LOG_INFO(
immutable_db_options_.info_log,
"[%s] Unscheduled %d number of manual compactions from the "
"thread-pool",
cfd->GetName().c_str(), unscheduled_task_num);
bg_cv_.SignalAll();
}
unscheduled = true;
TEST_SYNC_POINT("DBImpl::RunManualCompaction:Unscheduled");
}
if (scheduled && manual.incomplete == true) {
assert(!manual.in_progress);
scheduled = false;
manual.incomplete = false;
}
} else if (!scheduled) {
if (compaction == nullptr) {
manual.done = true;
if (final_output_level) {
*final_output_level = output_level;
if (output_level == ColumnFamilyData::kCompactToBaseLevel) {
*final_output_level = cfd->current()->storage_info()->base_level();
}
}
bg_cv_.SignalAll();
continue;
}
ca = new CompactionArg;
ca->db = this;
ca->prepicked_compaction = new PrepickedCompaction;
ca->prepicked_compaction->manual_compaction_state = &manual;
ca->prepicked_compaction->compaction = compaction;
if (!RequestCompactionToken(
cfd, true, &ca->prepicked_compaction->task_token, &log_buffer)) {
assert(false);
}
ca->prepicked_compaction->need_repick = false;
manual.incomplete = false;
if (compaction->bottommost_level() &&
env_->GetBackgroundThreads(Env::Priority::BOTTOM) > 0) {
bg_bottom_compaction_scheduled_++;
ca->compaction_pri_ = Env::Priority::BOTTOM;
env_->Schedule(&DBImpl::BGWorkBottomCompaction, ca,
Env::Priority::BOTTOM,
GetTaskTag(TaskType::kManualCompaction),
&DBImpl::UnscheduleCompactionCallback);
thread_pool_priority = Env::Priority::BOTTOM;
} else {
bg_compaction_scheduled_++;
ca->compaction_pri_ = Env::Priority::LOW;
env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW,
GetTaskTag(TaskType::kManualCompaction),
&DBImpl::UnscheduleCompactionCallback);
thread_pool_priority = Env::Priority::LOW;
}
scheduled = true;
TEST_SYNC_POINT("DBImpl::RunManualCompaction:Scheduled");
if (final_output_level) {
*final_output_level = compaction->output_level();
}
}
if (!scheduled) {
if (compaction_aborted_.load(std::memory_order_acquire) > 0) {
manual.done = true;
manual.status = Status::Incomplete(Status::SubCode::kCompactionAborted);
} else if (manual_compaction_paused_ > 0 || manual.canceled == true) {
manual.done = true;
manual.status =
Status::Incomplete(Status::SubCode::kManualCompactionPaused);
}
}
}
log_buffer.FlushBufferToLog();
assert(!manual.in_progress);
assert(HasPendingManualCompaction());
RemoveManualCompaction(&manual);
if (manual.status.IsIncomplete() &&
manual.status.subcode() == Status::SubCode::kManualCompactionPaused) {
MaybeScheduleFlushOrCompaction();
}
bg_cv_.SignalAll();
return manual.status;
}
void DBImpl::GenerateFlushRequest(const autovector<ColumnFamilyData*>& cfds,
FlushReason flush_reason, FlushRequest* req) {
assert(req != nullptr);
req->flush_reason = flush_reason;
req->cfd_to_max_mem_id_to_persist.reserve(cfds.size());
for (const auto cfd : cfds) {
if (nullptr == cfd) {
continue;
}
uint64_t max_memtable_id = cfd->imm()->GetLatestMemTableID(
immutable_db_options_.atomic_flush );
req->cfd_to_max_mem_id_to_persist.emplace(cfd, max_memtable_id);
}
}
void DBImpl::NotifyOnManualFlushScheduled(autovector<ColumnFamilyData*> cfds,
FlushReason flush_reason) {
if (immutable_db_options_.listeners.size() == 0U) {
return;
}
if (shutting_down_.load(std::memory_order_acquire)) {
return;
}
std::vector<ManualFlushInfo> info;
for (ColumnFamilyData* cfd : cfds) {
info.push_back({cfd->GetID(), cfd->GetName(), flush_reason});
}
for (const auto& listener : immutable_db_options_.listeners) {
listener->OnManualFlushScheduled(this, info);
}
}
Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
const FlushOptions& flush_options,
FlushReason flush_reason,
bool entered_write_thread) {
assert(!immutable_db_options_.atomic_flush);
if (!flush_options.wait && write_controller_.IsStopped()) {
std::ostringstream oss;
oss << "Writes have been stopped, thus unable to perform manual flush. "
"Please try again later after writes are resumed";
return Status::TryAgain(oss.str());
}
Status s;
if (!flush_options.allow_write_stall) {
bool flush_needed = true;
s = WaitUntilFlushWouldNotStallWrites(cfd, &flush_needed);
TEST_SYNC_POINT("DBImpl::FlushMemTable:StallWaitDone");
if (!s.ok() || !flush_needed) {
return s;
}
}
const bool needs_to_join_write_thread = !entered_write_thread;
autovector<FlushRequest> flush_reqs;
autovector<uint64_t> memtable_ids_to_wait;
{
WriteContext context;
InstrumentedMutexLock guard_lock(&mutex_);
WriteThread::Writer w;
WriteThread::Writer nonmem_w;
if (needs_to_join_write_thread) {
write_thread_.EnterUnbatched(&w, &mutex_);
if (two_write_queues_) {
nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
}
}
WaitForPendingWrites();
if (!cfd->mem()->IsEmpty() || !cached_recoverable_state_empty_.load() ||
IsRecoveryFlush(flush_reason)) {
s = SwitchMemtable(cfd, &context);
}
const uint64_t flush_memtable_id = std::numeric_limits<uint64_t>::max();
if (s.ok()) {
if (cfd->imm()->NumNotFlushed() != 0 || !cfd->mem()->IsEmpty() ||
!cached_recoverable_state_empty_.load() ||
IsRecoveryFlush(flush_reason)) {
FlushRequest req{flush_reason, {{cfd, flush_memtable_id}}};
flush_reqs.emplace_back(std::move(req));
memtable_ids_to_wait.emplace_back(
cfd->imm()->GetLatestMemTableID(false ));
}
if (immutable_db_options_.persist_stats_to_disk) {
ColumnFamilyData* cfd_stats =
versions_->GetColumnFamilySet()->GetColumnFamily(
kPersistentStatsColumnFamilyName);
if (cfd_stats != nullptr && cfd_stats != cfd &&
!cfd_stats->mem()->IsEmpty()) {
bool stats_cf_flush_needed = true;
for (auto* loop_cfd : *versions_->GetColumnFamilySet()) {
if (loop_cfd == cfd_stats || loop_cfd == cfd) {
continue;
}
if (loop_cfd->GetLogNumber() <= cfd_stats->GetLogNumber()) {
stats_cf_flush_needed = false;
}
}
if (stats_cf_flush_needed) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Force flushing stats CF with manual flush of %s "
"to avoid holding old logs",
cfd->GetName().c_str());
s = SwitchMemtable(cfd_stats, &context);
FlushRequest req{flush_reason, {{cfd_stats, flush_memtable_id}}};
flush_reqs.emplace_back(std::move(req));
memtable_ids_to_wait.emplace_back(
cfd_stats->imm()->GetLatestMemTableID(
false ));
}
}
}
}
if (s.ok() && !flush_reqs.empty()) {
for (const auto& req : flush_reqs) {
assert(req.cfd_to_max_mem_id_to_persist.size() == 1);
ColumnFamilyData* loop_cfd =
req.cfd_to_max_mem_id_to_persist.begin()->first;
loop_cfd->imm()->FlushRequested();
}
if (flush_options.wait) {
for (const auto& req : flush_reqs) {
assert(req.cfd_to_max_mem_id_to_persist.size() == 1);
ColumnFamilyData* loop_cfd =
req.cfd_to_max_mem_id_to_persist.begin()->first;
loop_cfd->Ref();
}
}
for (const auto& req : flush_reqs) {
assert(req.cfd_to_max_mem_id_to_persist.size() == 1);
ColumnFamilyData* loop_cfd =
req.cfd_to_max_mem_id_to_persist.begin()->first;
bool already_queued_for_flush = loop_cfd->queued_for_flush();
bool flush_req_enqueued = EnqueuePendingFlush(req);
if (already_queued_for_flush || flush_req_enqueued) {
loop_cfd->SetFlushSkipReschedule();
}
}
MaybeScheduleFlushOrCompaction();
}
if (needs_to_join_write_thread) {
write_thread_.ExitUnbatched(&w);
if (two_write_queues_) {
nonmem_write_thread_.ExitUnbatched(&nonmem_w);
}
}
}
NotifyOnManualFlushScheduled({cfd}, flush_reason);
TEST_SYNC_POINT("DBImpl::FlushMemTable:AfterScheduleFlush");
TEST_SYNC_POINT("DBImpl::FlushMemTable:BeforeWaitForBgFlush");
if (s.ok() && flush_options.wait) {
autovector<ColumnFamilyData*> cfds;
autovector<const uint64_t*> flush_memtable_ids;
assert(flush_reqs.size() == memtable_ids_to_wait.size());
for (size_t i = 0; i < flush_reqs.size(); ++i) {
assert(flush_reqs[i].cfd_to_max_mem_id_to_persist.size() == 1);
cfds.push_back(flush_reqs[i].cfd_to_max_mem_id_to_persist.begin()->first);
flush_memtable_ids.push_back(&(memtable_ids_to_wait[i]));
}
s = WaitForFlushMemTables(
cfds, flush_memtable_ids,
flush_reason == FlushReason::kErrorRecovery ,
flush_reason);
InstrumentedMutexLock lock_guard(&mutex_);
for (auto* tmp_cfd : cfds) {
tmp_cfd->UnrefAndTryDelete();
}
}
TEST_SYNC_POINT("DBImpl::FlushMemTable:FlushMemTableFinished");
return s;
}
Status DBImpl::AtomicFlushMemTables(
const FlushOptions& flush_options, FlushReason flush_reason,
const autovector<ColumnFamilyData*>& provided_candidate_cfds,
bool entered_write_thread) {
assert(immutable_db_options_.atomic_flush);
if (!flush_options.wait && write_controller_.IsStopped()) {
std::ostringstream oss;
oss << "Writes have been stopped, thus unable to perform manual flush. "
"Please try again later after writes are resumed";
return Status::TryAgain(oss.str());
}
Status s;
autovector<ColumnFamilyData*> candidate_cfds;
if (provided_candidate_cfds.empty()) {
{
InstrumentedMutexLock l(&mutex_);
for (ColumnFamilyData* cfd : *versions_->GetColumnFamilySet()) {
if (!cfd->IsDropped() && cfd->initialized()) {
cfd->Ref();
candidate_cfds.push_back(cfd);
}
}
}
} else {
candidate_cfds = provided_candidate_cfds;
}
if (!flush_options.allow_write_stall) {
int num_cfs_to_flush = 0;
for (auto cfd : candidate_cfds) {
bool flush_needed = true;
s = WaitUntilFlushWouldNotStallWrites(cfd, &flush_needed);
if (!s.ok()) {
if (provided_candidate_cfds.empty()) {
for (auto candidate_cfd : candidate_cfds) {
candidate_cfd->UnrefAndTryDelete();
}
}
return s;
} else if (flush_needed) {
++num_cfs_to_flush;
}
}
if (0 == num_cfs_to_flush) {
if (provided_candidate_cfds.empty()) {
for (auto candidate_cfd : candidate_cfds) {
candidate_cfd->UnrefAndTryDelete();
}
}
return s;
}
}
const bool needs_to_join_write_thread = !entered_write_thread;
FlushRequest flush_req;
autovector<ColumnFamilyData*> cfds;
{
WriteContext context;
InstrumentedMutexLock guard_lock(&mutex_);
WriteThread::Writer w;
WriteThread::Writer nonmem_w;
if (needs_to_join_write_thread) {
write_thread_.EnterUnbatched(&w, &mutex_);
if (two_write_queues_) {
nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
}
}
WaitForPendingWrites();
SelectColumnFamiliesForAtomicFlush(&cfds, candidate_cfds, flush_reason);
if (provided_candidate_cfds.empty()) {
for (auto candidate_cfd : candidate_cfds) {
candidate_cfd->UnrefAndTryDelete();
}
}
for (auto cfd : cfds) {
if (cfd->mem()->IsEmpty() && cached_recoverable_state_empty_.load() &&
!IsRecoveryFlush(flush_reason)) {
continue;
}
cfd->Ref();
s = SwitchMemtable(cfd, &context);
cfd->UnrefAndTryDelete();
if (!s.ok()) {
break;
}
}
if (s.ok()) {
AssignAtomicFlushSeq(cfds);
for (auto cfd : cfds) {
cfd->imm()->FlushRequested();
}
if (flush_options.wait) {
for (auto cfd : cfds) {
cfd->Ref();
}
}
GenerateFlushRequest(cfds, flush_reason, &flush_req);
EnqueuePendingFlush(flush_req);
MaybeScheduleFlushOrCompaction();
}
if (needs_to_join_write_thread) {
write_thread_.ExitUnbatched(&w);
if (two_write_queues_) {
nonmem_write_thread_.ExitUnbatched(&nonmem_w);
}
}
}
NotifyOnManualFlushScheduled(cfds, flush_reason);
TEST_SYNC_POINT("DBImpl::AtomicFlushMemTables:AfterScheduleFlush");
TEST_SYNC_POINT("DBImpl::AtomicFlushMemTables:BeforeWaitForBgFlush");
if (s.ok() && flush_options.wait) {
autovector<const uint64_t*> flush_memtable_ids;
for (auto& iter : flush_req.cfd_to_max_mem_id_to_persist) {
flush_memtable_ids.push_back(&(iter.second));
}
s = WaitForFlushMemTables(
cfds, flush_memtable_ids,
flush_reason == FlushReason::kErrorRecovery ,
flush_reason);
InstrumentedMutexLock lock_guard(&mutex_);
for (auto* cfd : cfds) {
cfd->UnrefAndTryDelete();
}
}
return s;
}
Status DBImpl::RetryFlushesForErrorRecovery(FlushReason flush_reason,
bool wait) {
mutex_.AssertHeld();
assert(flush_reason == FlushReason::kErrorRecoveryRetryFlush ||
flush_reason == FlushReason::kCatchUpAfterErrorRecovery);
autovector<ColumnFamilyData*> cfds;
for (ColumnFamilyData* cfd : *versions_->GetColumnFamilySet()) {
if (!cfd->IsDropped() && cfd->initialized() &&
cfd->imm()->NumNotFlushed() != 0) {
cfd->Ref();
cfd->imm()->FlushRequested();
cfds.push_back(cfd);
}
}
autovector<uint64_t> flush_memtable_ids;
if (immutable_db_options_.atomic_flush) {
FlushRequest flush_req;
GenerateFlushRequest(cfds, flush_reason, &flush_req);
EnqueuePendingFlush(flush_req);
for (auto& iter : flush_req.cfd_to_max_mem_id_to_persist) {
flush_memtable_ids.push_back(iter.second);
}
} else {
for (auto cfd : cfds) {
flush_memtable_ids.push_back(
cfd->imm()->GetLatestMemTableID(false ));
FlushRequest flush_req{
flush_reason,
{{cfd,
std::numeric_limits<uint64_t>::max() }}};
if (EnqueuePendingFlush(flush_req)) {
cfd->SetFlushSkipReschedule();
};
}
}
MaybeScheduleFlushOrCompaction();
Status s;
if (wait) {
mutex_.Unlock();
autovector<const uint64_t*> flush_memtable_id_ptrs;
for (auto& flush_memtable_id : flush_memtable_ids) {
flush_memtable_id_ptrs.push_back(&flush_memtable_id);
}
s = WaitForFlushMemTables(cfds, flush_memtable_id_ptrs,
true , flush_reason);
mutex_.Lock();
}
for (auto* cfd : cfds) {
cfd->UnrefAndTryDelete();
}
return s;
}
Status DBImpl::WaitUntilFlushWouldNotStallWrites(ColumnFamilyData* cfd,
bool* flush_needed) {
{
*flush_needed = true;
InstrumentedMutexLock l(&mutex_);
uint64_t orig_active_memtable_id = cfd->mem()->GetID();
WriteStallCondition write_stall_condition = WriteStallCondition::kNormal;
do {
if (write_stall_condition != WriteStallCondition::kNormal) {
if (error_handler_.IsBGWorkStopped()) {
return error_handler_.GetBGError();
}
TEST_SYNC_POINT("DBImpl::WaitUntilFlushWouldNotStallWrites:StallWait");
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"[%s] WaitUntilFlushWouldNotStallWrites"
" waiting on stall conditions to clear",
cfd->GetName().c_str());
bg_cv_.Wait();
}
if (cfd->IsDropped()) {
return Status::ColumnFamilyDropped();
}
if (shutting_down_.load(std::memory_order_acquire)) {
return Status::ShutdownInProgress();
}
uint64_t earliest_memtable_id =
std::min(cfd->mem()->GetID(), cfd->imm()->GetEarliestMemTableID());
if (earliest_memtable_id > orig_active_memtable_id) {
*flush_needed = false;
return Status::OK();
}
const auto& mutable_cf_options = cfd->GetLatestMutableCFOptions();
const auto* vstorage = cfd->current()->storage_info();
if (cfd->imm()->NumNotFlushed() <
cfd->ioptions().min_write_buffer_number_to_merge &&
vstorage->l0_delay_trigger_count() <
mutable_cf_options.level0_file_num_compaction_trigger) {
break;
}
write_stall_condition =
ColumnFamilyData::GetWriteStallConditionAndCause(
cfd->GetUnflushedMemTableCountForWriteStallCheck(),
vstorage->l0_delay_trigger_count() + 1,
vstorage->estimated_compaction_needed_bytes(), mutable_cf_options,
cfd->ioptions())
.first;
} while (write_stall_condition != WriteStallCondition::kNormal);
}
return Status::OK();
}
Status DBImpl::WaitForFlushMemTables(
const autovector<ColumnFamilyData*>& cfds,
const autovector<const uint64_t*>& flush_memtable_ids,
bool resuming_from_bg_err, std::optional<FlushReason> flush_reason) {
int num = static_cast<int>(cfds.size());
InstrumentedMutexLock l(&mutex_);
Status s;
while (resuming_from_bg_err || !error_handler_.IsDBStopped()) {
if (shutting_down_.load(std::memory_order_acquire)) {
s = Status::ShutdownInProgress();
return s;
}
if (!error_handler_.GetRecoveryError().ok()) {
s = error_handler_.GetRecoveryError();
break;
}
if (!resuming_from_bg_err && error_handler_.IsBGWorkStopped() &&
error_handler_.GetBGError().severity() < Status::Severity::kHardError) {
s = error_handler_.GetBGError();
return s;
}
int num_dropped = 0;
int num_finished = 0;
for (int i = 0; i < num; ++i) {
if (cfds[i]->IsDropped()) {
++num_dropped;
} else if (cfds[i]->imm()->NumNotFlushed() == 0 ||
(flush_memtable_ids[i] != nullptr &&
cfds[i]->imm()->GetEarliestMemTableID() >
*flush_memtable_ids[i])) {
if (!flush_reason.has_value() ||
flush_reason.value() != FlushReason::kExternalFileIngestion ||
cfds[i]->GetSuperVersion()->imm->GetID() ==
cfds[i]->imm()->current()->GetID()) {
++num_finished;
}
}
}
if (1 == num_dropped && 1 == num) {
s = Status::ColumnFamilyDropped();
return s;
}
if (num_dropped + num_finished == num) {
break;
}
bg_cv_.Wait();
}
if (!resuming_from_bg_err && error_handler_.IsDBStopped()) {
s = error_handler_.GetBGError();
}
return s;
}
Status DBImpl::EnableAutoCompaction(
const std::vector<ColumnFamilyHandle*>& column_family_handles) {
return SetOptions(column_family_handles,
{{"disable_auto_compactions", "false"}});
}
void DBImpl::DisableManualCompaction() {
InstrumentedMutexLock l(&mutex_);
manual_compaction_paused_.fetch_add(1, std::memory_order_release);
for (const auto& manual_compaction : manual_compaction_dequeue_) {
manual_compaction->canceled = true;
}
bg_cv_.SignalAll();
while (HasPendingManualCompaction()) {
bg_cv_.Wait();
}
}
void DBImpl::EnableManualCompaction() {
InstrumentedMutexLock l(&mutex_);
assert(manual_compaction_paused_ > 0);
manual_compaction_paused_.fetch_sub(1, std::memory_order_release);
}
void DBImpl::AbortAllCompactions() {
InstrumentedMutexLock l(&mutex_);
compaction_aborted_.fetch_add(1, std::memory_order_release);
TEST_SYNC_POINT("DBImpl::AbortAllCompactions:FlagSet");
for (const auto& manual_compaction : manual_compaction_dequeue_) {
manual_compaction->canceled = true;
}
bg_cv_.SignalAll();
while (bg_bottom_compaction_scheduled_ > 0 || bg_compaction_scheduled_ > 0 ||
HasPendingManualCompaction()) {
bg_cv_.Wait();
}
}
void DBImpl::ResumeAllCompactions() {
InstrumentedMutexLock l(&mutex_);
int before = compaction_aborted_.load(std::memory_order_acquire);
if (before <= 0) {
ROCKS_LOG_WARN(immutable_db_options_.info_log,
"ResumeAllCompactions called without prior "
"AbortAllCompactions (counter=%d)",
before);
return;
}
compaction_aborted_.fetch_sub(1, std::memory_order_release);
int current = before - 1;
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"ResumeAllCompactions: counter %d -> %d", before, current);
if (current == 0) {
MaybeScheduleFlushOrCompaction();
}
}
void DBImpl::MaybeScheduleFlushOrCompaction() {
mutex_.AssertHeld();
TEST_SYNC_POINT("DBImpl::MaybeScheduleFlushOrCompaction:Start");
if (!opened_successfully_) {
return;
}
if (bg_work_paused_ > 0) {
return;
} else if (error_handler_.IsBGWorkStopped() &&
!error_handler_.IsRecoveryInProgress()) {
return;
} else if (shutting_down_.load(std::memory_order_acquire)) {
return;
}
auto bg_job_limits = GetBGJobLimits();
bool is_flush_pool_empty =
env_->GetBackgroundThreads(Env::Priority::HIGH) == 0;
while (!is_flush_pool_empty && unscheduled_flushes_ > 0 &&
bg_flush_scheduled_ < bg_job_limits.max_flushes) {
TEST_SYNC_POINT_CALLBACK(
"DBImpl::MaybeScheduleFlushOrCompaction:BeforeSchedule",
&unscheduled_flushes_);
bg_flush_scheduled_++;
FlushThreadArg* fta = new FlushThreadArg;
fta->db_ = this;
fta->thread_pri_ = Env::Priority::HIGH;
env_->Schedule(&DBImpl::BGWorkFlush, fta, Env::Priority::HIGH, this,
&DBImpl::UnscheduleFlushCallback);
--unscheduled_flushes_;
TEST_SYNC_POINT_CALLBACK(
"DBImpl::MaybeScheduleFlushOrCompaction:AfterSchedule:0",
&unscheduled_flushes_);
}
if (is_flush_pool_empty) {
while (unscheduled_flushes_ > 0 &&
bg_flush_scheduled_ + bg_compaction_scheduled_ <
bg_job_limits.max_flushes) {
bg_flush_scheduled_++;
FlushThreadArg* fta = new FlushThreadArg;
fta->db_ = this;
fta->thread_pri_ = Env::Priority::LOW;
env_->Schedule(&DBImpl::BGWorkFlush, fta, Env::Priority::LOW, this,
&DBImpl::UnscheduleFlushCallback);
--unscheduled_flushes_;
}
}
if (bg_compaction_paused_ > 0) {
return;
} else if (compaction_aborted_.load(std::memory_order_acquire) > 0) {
return;
} else if (error_handler_.IsBGWorkStopped()) {
return;
}
if (HasExclusiveManualCompaction()) {
TEST_SYNC_POINT("DBImpl::MaybeScheduleFlushOrCompaction:Conflict");
return;
}
while (bg_compaction_scheduled_ + bg_bottom_compaction_scheduled_ <
bg_job_limits.max_compactions &&
unscheduled_compactions_ > 0) {
CompactionArg* ca = new CompactionArg;
ca->db = this;
ca->compaction_pri_ = Env::Priority::LOW;
ca->prepicked_compaction = nullptr;
bg_compaction_scheduled_++;
unscheduled_compactions_--;
env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, this,
&DBImpl::UnscheduleCompactionCallback);
}
}
DBImpl::BGJobLimits DBImpl::GetBGJobLimits() const {
mutex_.AssertHeld();
return GetBGJobLimits(mutable_db_options_.max_background_flushes,
mutable_db_options_.max_background_compactions,
mutable_db_options_.max_background_jobs,
write_controller_.NeedSpeedupCompaction());
}
DBImpl::BGJobLimits DBImpl::GetBGJobLimits(int max_background_flushes,
int max_background_compactions,
int max_background_jobs,
bool parallelize_compactions) {
BGJobLimits res;
if (max_background_flushes == -1 && max_background_compactions == -1) {
res.max_flushes = std::max(1, max_background_jobs / 4);
res.max_compactions = std::max(1, max_background_jobs - res.max_flushes);
} else {
res.max_flushes = std::max(1, max_background_flushes);
res.max_compactions = std::max(1, max_background_compactions);
}
if (!parallelize_compactions) {
res.max_compactions = 1;
}
return res;
}
void DBImpl::AddToCompactionQueue(ColumnFamilyData* cfd) {
assert(!cfd->queued_for_compaction());
cfd->Ref();
compaction_queue_.push_back(cfd);
cfd->set_queued_for_compaction(true);
++unscheduled_compactions_;
}
ColumnFamilyData* DBImpl::PopFirstFromCompactionQueue() {
assert(!compaction_queue_.empty());
auto cfd = *compaction_queue_.begin();
compaction_queue_.pop_front();
assert(cfd->queued_for_compaction());
cfd->set_queued_for_compaction(false);
return cfd;
}
DBImpl::FlushRequest DBImpl::PopFirstFromFlushQueue() {
assert(!flush_queue_.empty());
FlushRequest flush_req = std::move(flush_queue_.front());
flush_queue_.pop_front();
if (!immutable_db_options_.atomic_flush) {
assert(flush_req.cfd_to_max_mem_id_to_persist.size() == 1);
}
for (const auto& elem : flush_req.cfd_to_max_mem_id_to_persist) {
if (!immutable_db_options_.atomic_flush) {
ColumnFamilyData* cfd = elem.first;
assert(cfd);
assert(cfd->queued_for_flush());
cfd->set_queued_for_flush(false);
}
}
return flush_req;
}
ColumnFamilyData* DBImpl::PickCompactionFromQueue(
std::unique_ptr<TaskLimiterToken>* token, LogBuffer* log_buffer) {
assert(!compaction_queue_.empty());
assert(*token == nullptr);
autovector<ColumnFamilyData*> throttled_candidates;
ColumnFamilyData* cfd = nullptr;
while (!compaction_queue_.empty()) {
auto first_cfd = *compaction_queue_.begin();
compaction_queue_.pop_front();
assert(first_cfd->queued_for_compaction());
if (!RequestCompactionToken(first_cfd, false, token, log_buffer)) {
throttled_candidates.push_back(first_cfd);
continue;
}
cfd = first_cfd;
cfd->set_queued_for_compaction(false);
break;
}
for (auto iter = throttled_candidates.rbegin();
iter != throttled_candidates.rend(); ++iter) {
compaction_queue_.push_front(*iter);
}
return cfd;
}
bool DBImpl::EnqueuePendingFlush(const FlushRequest& flush_req) {
mutex_.AssertHeld();
bool enqueued = false;
if (reject_new_background_jobs_) {
return enqueued;
}
if (flush_req.cfd_to_max_mem_id_to_persist.empty()) {
return enqueued;
}
if (!immutable_db_options_.atomic_flush) {
assert(flush_req.cfd_to_max_mem_id_to_persist.size() == 1);
ColumnFamilyData* cfd =
flush_req.cfd_to_max_mem_id_to_persist.begin()->first;
assert(cfd);
if (!cfd->queued_for_flush() && cfd->imm()->IsFlushPending()) {
cfd->Ref();
cfd->set_queued_for_flush(true);
++unscheduled_flushes_;
flush_queue_.push_back(flush_req);
enqueued = true;
}
} else {
for (auto& iter : flush_req.cfd_to_max_mem_id_to_persist) {
ColumnFamilyData* cfd = iter.first;
cfd->Ref();
}
++unscheduled_flushes_;
flush_queue_.push_back(flush_req);
enqueued = true;
}
return enqueued;
}
void DBImpl::EnqueuePendingCompaction(ColumnFamilyData* cfd) {
mutex_.AssertHeld();
if (reject_new_background_jobs_) {
return;
}
if (!cfd->queued_for_compaction() && cfd->NeedsCompaction()) {
TEST_SYNC_POINT_CALLBACK("EnqueuePendingCompaction::cfd",
static_cast<void*>(cfd));
AddToCompactionQueue(cfd);
}
}
void DBImpl::SchedulePendingPurge(std::string fname, std::string dir_to_sync,
FileType type, uint64_t number, int job_id) {
mutex_.AssertHeld();
if (reject_new_background_jobs_) {
return;
}
PurgeFileInfo file_info(fname, dir_to_sync, type, number, job_id);
purge_files_.insert({{number, std::move(file_info)}});
}
void DBImpl::BGWorkFlush(void* arg) {
FlushThreadArg fta = *(static_cast<FlushThreadArg*>(arg));
delete static_cast<FlushThreadArg*>(arg);
IOSTATS_SET_THREAD_POOL_ID(fta.thread_pri_);
TEST_SYNC_POINT("DBImpl::BGWorkFlush");
static_cast_with_check<DBImpl>(fta.db_)->BackgroundCallFlush(fta.thread_pri_);
TEST_SYNC_POINT("DBImpl::BGWorkFlush:done");
}
void DBImpl::BGWorkCompaction(void* arg) {
CompactionArg ca = *(static_cast<CompactionArg*>(arg));
delete static_cast<CompactionArg*>(arg);
IOSTATS_SET_THREAD_POOL_ID(Env::Priority::LOW);
TEST_SYNC_POINT("DBImpl::BGWorkCompaction");
auto prepicked_compaction =
static_cast<PrepickedCompaction*>(ca.prepicked_compaction);
static_cast_with_check<DBImpl>(ca.db)->BackgroundCallCompaction(
prepicked_compaction, Env::Priority::LOW);
delete prepicked_compaction;
}
void DBImpl::BGWorkBottomCompaction(void* arg) {
CompactionArg ca = *(static_cast<CompactionArg*>(arg));
delete static_cast<CompactionArg*>(arg);
IOSTATS_SET_THREAD_POOL_ID(Env::Priority::BOTTOM);
TEST_SYNC_POINT("DBImpl::BGWorkBottomCompaction");
auto* prepicked_compaction = ca.prepicked_compaction;
assert(prepicked_compaction && prepicked_compaction->compaction);
ca.db->BackgroundCallCompaction(prepicked_compaction, Env::Priority::BOTTOM);
delete prepicked_compaction;
}
void DBImpl::BGWorkPurge(void* db) {
IOSTATS_SET_THREAD_POOL_ID(Env::Priority::HIGH);
TEST_SYNC_POINT("DBImpl::BGWorkPurge:start");
static_cast<DBImpl*>(db)->BackgroundCallPurge();
TEST_SYNC_POINT("DBImpl::BGWorkPurge:end");
}
void DBImpl::UnscheduleCompactionCallback(void* arg) {
CompactionArg* ca_ptr = static_cast<CompactionArg*>(arg);
Env::Priority compaction_pri = ca_ptr->compaction_pri_;
if (Env::Priority::BOTTOM == compaction_pri) {
ca_ptr->db->bg_bottom_compaction_scheduled_--;
} else if (Env::Priority::LOW == compaction_pri) {
ca_ptr->db->bg_compaction_scheduled_--;
}
CompactionArg ca = *(ca_ptr);
delete static_cast<CompactionArg*>(arg);
if (ca.prepicked_compaction != nullptr) {
if (ca.prepicked_compaction->manual_compaction_state) {
ca.prepicked_compaction->manual_compaction_state->done = true;
ca.prepicked_compaction->manual_compaction_state->status =
Status::Incomplete(Status::SubCode::kManualCompactionPaused);
}
if (ca.prepicked_compaction->compaction != nullptr) {
ca.prepicked_compaction->compaction->ReleaseCompactionFiles(
Status::Incomplete(Status::SubCode::kManualCompactionPaused));
delete ca.prepicked_compaction->compaction;
}
delete ca.prepicked_compaction;
}
TEST_SYNC_POINT("DBImpl::UnscheduleCompactionCallback");
}
void DBImpl::UnscheduleFlushCallback(void* arg) {
static_cast<FlushThreadArg*>(arg)->db_->bg_flush_scheduled_--;
Env::Priority flush_pri = static_cast<FlushThreadArg*>(arg)->thread_pri_;
if (Env::Priority::LOW == flush_pri) {
TEST_SYNC_POINT("DBImpl::UnscheduleLowFlushCallback");
} else if (Env::Priority::HIGH == flush_pri) {
TEST_SYNC_POINT("DBImpl::UnscheduleHighFlushCallback");
}
delete static_cast<FlushThreadArg*>(arg);
TEST_SYNC_POINT("DBImpl::UnscheduleFlushCallback");
}
Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context,
LogBuffer* log_buffer, FlushReason* reason,
bool* flush_rescheduled_to_retain_udt,
Env::Priority thread_pri) {
mutex_.AssertHeld();
Status status;
*reason = FlushReason::kOthers;
if (!error_handler_.IsBGWorkStopped()) {
if (shutting_down_.load(std::memory_order_acquire)) {
status = Status::ShutdownInProgress();
}
} else if (!error_handler_.IsRecoveryInProgress()) {
status = error_handler_.GetBGError();
}
if (!status.ok()) {
return status;
}
autovector<BGFlushArg> bg_flush_args;
std::vector<SuperVersionContext>& superversion_contexts =
job_context->superversion_contexts;
autovector<ColumnFamilyData*> column_families_not_to_flush;
while (!flush_queue_.empty()) {
FlushRequest flush_req = PopFirstFromFlushQueue();
FlushReason flush_reason = flush_req.flush_reason;
if (!error_handler_.GetBGError().ok() && error_handler_.IsBGWorkStopped() &&
flush_reason != FlushReason::kErrorRecovery &&
flush_reason != FlushReason::kErrorRecoveryRetryFlush) {
status = error_handler_.GetBGError();
assert(!status.ok());
ROCKS_LOG_BUFFER(log_buffer,
"[JOB %d] Abort flush due to background error %s",
job_context->job_id, status.ToString().c_str());
*reason = flush_reason;
for (auto item : flush_req.cfd_to_max_mem_id_to_persist) {
item.first->UnrefAndTryDelete();
}
return status;
}
if (!immutable_db_options_.atomic_flush &&
ShouldRescheduleFlushRequestToRetainUDT(flush_req)) {
assert(flush_req.cfd_to_max_mem_id_to_persist.size() == 1);
ColumnFamilyData* cfd =
flush_req.cfd_to_max_mem_id_to_persist.begin()->first;
if (cfd->UnrefAndTryDelete()) {
return Status::OK();
}
ROCKS_LOG_BUFFER(log_buffer,
"FlushRequest for column family %s is re-scheduled to "
"retain user-defined timestamps.",
cfd->GetName().c_str());
#ifndef NDEBUG
flush_req.reschedule_count += 1;
#endif
EnqueuePendingFlush(flush_req);
*reason = flush_reason;
*flush_rescheduled_to_retain_udt = true;
return Status::TryAgain();
}
superversion_contexts.clear();
superversion_contexts.reserve(
flush_req.cfd_to_max_mem_id_to_persist.size());
for (const auto& [cfd, max_memtable_id] :
flush_req.cfd_to_max_mem_id_to_persist) {
if (cfd->GetMempurgeUsed()) {
cfd->imm()->FlushRequested();
}
if (cfd->IsDropped() || !cfd->imm()->IsFlushPending()) {
column_families_not_to_flush.push_back(cfd);
continue;
}
superversion_contexts.emplace_back(true);
bg_flush_args.emplace_back(cfd, max_memtable_id,
&(superversion_contexts.back()), flush_reason);
}
if (!bg_flush_args.empty() || !column_families_not_to_flush.empty()) {
TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundFlush:CheckFlushRequest:cb",
const_cast<int*>(&flush_req.reschedule_count));
break;
}
}
if (!bg_flush_args.empty()) {
auto bg_job_limits = GetBGJobLimits();
for (const auto& arg : bg_flush_args) {
ColumnFamilyData* cfd = arg.cfd_;
ROCKS_LOG_BUFFER(
log_buffer,
"Calling FlushMemTableToOutputFile with column "
"family [%s], flush slots available %d, compaction slots available "
"%d, "
"flush slots scheduled %d, compaction slots scheduled %d",
cfd->GetName().c_str(), bg_job_limits.max_flushes,
bg_job_limits.max_compactions, bg_flush_scheduled_,
bg_compaction_scheduled_);
}
status = FlushMemTablesToOutputFiles(bg_flush_args, made_progress,
job_context, log_buffer, thread_pri);
TEST_SYNC_POINT("DBImpl::BackgroundFlush:BeforeFlush");
#ifndef NDEBUG
for (const auto& bg_flush_arg : bg_flush_args) {
assert(bg_flush_arg.flush_reason_ == bg_flush_args[0].flush_reason_);
}
#endif
*reason = bg_flush_args[0].flush_reason_;
for (auto& arg : bg_flush_args) {
ColumnFamilyData* cfd = arg.cfd_;
if (cfd->UnrefAndTryDelete()) {
arg.cfd_ = nullptr;
}
}
}
for (auto cfd : column_families_not_to_flush) {
cfd->UnrefAndTryDelete();
}
return status;
}
void DBImpl::BackgroundCallFlush(Env::Priority thread_pri) {
bool made_progress = false;
JobContext job_context(next_job_id_.fetch_add(1), true);
TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCallFlush:start", nullptr);
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL,
immutable_db_options_.info_log.get());
TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:Start:1");
TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:Start:2");
{
InstrumentedMutexLock l(&mutex_);
assert(bg_flush_scheduled_);
num_running_flushes_++;
std::unique_ptr<std::list<uint64_t>::iterator>
pending_outputs_inserted_elem(new std::list<uint64_t>::iterator(
CaptureCurrentFileNumberInPendingOutputs()));
FlushReason reason;
bool flush_rescheduled_to_retain_udt = false;
Status s =
BackgroundFlush(&made_progress, &job_context, &log_buffer, &reason,
&flush_rescheduled_to_retain_udt, thread_pri);
if (s.IsTryAgain() && flush_rescheduled_to_retain_udt) {
bg_cv_.SignalAll(); mutex_.Unlock();
TEST_SYNC_POINT_CALLBACK("DBImpl::AfterRetainUDTReschedule:cb", nullptr);
immutable_db_options_.clock->SleepForMicroseconds(
100000); mutex_.Lock();
} else if (!s.ok() && !s.IsShutdownInProgress() &&
!s.IsColumnFamilyDropped() &&
reason != FlushReason::kErrorRecovery) {
uint64_t error_cnt =
default_cf_internal_stats_->BumpAndGetBackgroundErrorCount();
bg_cv_.SignalAll(); mutex_.Unlock();
ROCKS_LOG_ERROR(immutable_db_options_.info_log,
"[JOB %d] Waiting after background flush error: %s, "
"Accumulated background error counts: %" PRIu64,
job_context.job_id, s.ToString().c_str(), error_cnt);
log_buffer.FlushBufferToLog();
LogFlush(immutable_db_options_.info_log);
immutable_db_options_.clock->SleepForMicroseconds(1000000);
mutex_.Lock();
}
TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:FlushFinish:0");
ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
if (!flush_rescheduled_to_retain_udt) {
FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress() &&
!s.IsColumnFamilyDropped());
}
if (job_context.HaveSomethingToClean() ||
job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) {
mutex_.Unlock();
TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:FilesFound");
log_buffer.FlushBufferToLog();
if (job_context.HaveSomethingToDelete()) {
PurgeObsoleteFiles(job_context);
}
job_context.Clean();
mutex_.Lock();
}
TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:ContextCleanedUp");
assert(num_running_flushes_ > 0);
num_running_flushes_--;
bg_flush_scheduled_--;
MaybeScheduleFlushOrCompaction();
atomic_flush_install_cv_.SignalAll();
bg_cv_.SignalAll();
}
}
void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction,
Env::Priority bg_thread_pri) {
bool made_progress = false;
JobContext job_context(next_job_id_.fetch_add(1), true);
TEST_SYNC_POINT("BackgroundCallCompaction:0");
if (bg_thread_pri == Env::Priority::BOTTOM) {
TEST_SYNC_POINT("BackgroundCallCompaction:0:BottomPri");
}
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL,
immutable_db_options_.info_log.get());
{
InstrumentedMutexLock l(&mutex_);
num_running_compactions_++;
std::unique_ptr<std::list<uint64_t>::iterator>
pending_outputs_inserted_elem(new std::list<uint64_t>::iterator(
CaptureCurrentFileNumberInPendingOutputs()));
assert((bg_thread_pri == Env::Priority::BOTTOM &&
bg_bottom_compaction_scheduled_) ||
(bg_thread_pri == Env::Priority::LOW && bg_compaction_scheduled_));
Status s = BackgroundCompaction(&made_progress, &job_context, &log_buffer,
prepicked_compaction, bg_thread_pri);
TEST_SYNC_POINT("BackgroundCallCompaction:1");
if (s.IsBusy()) {
bg_cv_.SignalAll(); mutex_.Unlock();
immutable_db_options_.clock->SleepForMicroseconds(
10000); mutex_.Lock();
} else if (!s.ok() && !s.IsShutdownInProgress() &&
!s.IsManualCompactionPaused() && !s.IsColumnFamilyDropped() &&
!s.IsCompactionAborted()) {
uint64_t error_cnt =
default_cf_internal_stats_->BumpAndGetBackgroundErrorCount();
bg_cv_.SignalAll(); mutex_.Unlock();
log_buffer.FlushBufferToLog();
ROCKS_LOG_ERROR(immutable_db_options_.info_log,
"Waiting after background compaction error: %s, "
"Accumulated background error counts: %" PRIu64,
s.ToString().c_str(), error_cnt);
LogFlush(immutable_db_options_.info_log);
immutable_db_options_.clock->SleepForMicroseconds(1000000);
mutex_.Lock();
} else if (s.IsManualCompactionPaused()) {
assert(prepicked_compaction);
ManualCompactionState* m = prepicked_compaction->manual_compaction_state;
assert(m);
ROCKS_LOG_BUFFER(&log_buffer, "[%s] [JOB %d] Manual compaction paused",
m->cfd->GetName().c_str(), job_context.job_id);
}
ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress() &&
!s.IsManualCompactionPaused() &&
!s.IsCompactionAborted() &&
!s.IsColumnFamilyDropped() &&
!s.IsBusy());
TEST_SYNC_POINT("DBImpl::BackgroundCallCompaction:FoundObsoleteFiles");
if (job_context.HaveSomethingToClean() ||
job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) {
mutex_.Unlock();
log_buffer.FlushBufferToLog();
if (job_context.HaveSomethingToDelete()) {
PurgeObsoleteFiles(job_context);
TEST_SYNC_POINT("DBImpl::BackgroundCallCompaction:PurgedObsoleteFiles");
}
job_context.Clean();
mutex_.Lock();
}
assert(num_running_compactions_ > 0);
num_running_compactions_--;
if (bg_thread_pri == Env::Priority::LOW) {
bg_compaction_scheduled_--;
} else {
assert(bg_thread_pri == Env::Priority::BOTTOM);
bg_bottom_compaction_scheduled_--;
}
MaybeScheduleFlushOrCompaction();
if (prepicked_compaction != nullptr &&
prepicked_compaction->task_token != nullptr) {
prepicked_compaction->task_token.reset();
}
if (made_progress ||
(bg_compaction_scheduled_ == 0 &&
bg_bottom_compaction_scheduled_ == 0) ||
HasPendingManualCompaction() || unscheduled_compactions_ == 0) {
bg_cv_.SignalAll();
}
}
}
Status DBImpl::BackgroundCompaction(bool* made_progress,
JobContext* job_context,
LogBuffer* log_buffer,
PrepickedCompaction* prepicked_compaction,
Env::Priority thread_pri) {
ManualCompactionState* manual_compaction =
prepicked_compaction == nullptr
? nullptr
: prepicked_compaction->manual_compaction_state;
*made_progress = false;
mutex_.AssertHeld();
TEST_SYNC_POINT("DBImpl::BackgroundCompaction:Start");
const ReadOptions read_options(Env::IOActivity::kCompaction);
const WriteOptions write_options(Env::IOActivity::kCompaction);
bool is_manual = (manual_compaction != nullptr);
std::unique_ptr<Compaction> c;
if (prepicked_compaction != nullptr &&
prepicked_compaction->compaction != nullptr) {
c.reset(prepicked_compaction->compaction);
}
bool is_prepicked = is_manual || c;
bool trivial_move_disallowed =
is_manual && manual_compaction->disallow_trivial_move;
CompactionJobStats compaction_job_stats;
compaction_job_stats.is_remote_compaction =
immutable_db_options().compaction_service != nullptr;
Status status;
if (!error_handler_.IsBGWorkStopped()) {
if (shutting_down_.load(std::memory_order_acquire)) {
status = Status::ShutdownInProgress();
} else if (compaction_aborted_.load(std::memory_order_acquire) > 0) {
status = Status::Incomplete(Status::SubCode::kCompactionAborted);
} else if (is_manual &&
manual_compaction->canceled.load(std::memory_order_acquire)) {
status = Status::Incomplete(Status::SubCode::kManualCompactionPaused);
}
} else {
status = error_handler_.GetBGError();
unscheduled_compactions_++;
}
if (!status.ok()) {
if (is_manual) {
manual_compaction->status = status;
manual_compaction->done = true;
manual_compaction->in_progress = false;
manual_compaction = nullptr;
}
if (c) {
c->ReleaseCompactionFiles(status);
c.reset();
}
return status;
}
if (is_manual) {
manual_compaction->in_progress = true;
}
TEST_SYNC_POINT("DBImpl::BackgroundCompaction:InProgress");
std::unique_ptr<TaskLimiterToken> task_token;
bool sfm_reserved_compact_space = false;
if (is_manual) {
ManualCompactionState* m = manual_compaction;
assert(m->in_progress);
if (!c) {
m->done = true;
m->manual_end = nullptr;
ROCKS_LOG_BUFFER(
log_buffer,
"[%s] Manual compaction from level-%d from %s .. "
"%s; nothing to do\n",
m->cfd->GetName().c_str(), m->input_level,
(m->begin ? m->begin->DebugString(true).c_str() : "(begin)"),
(m->end ? m->end->DebugString(true).c_str() : "(end)"));
} else {
bool enough_room = EnoughRoomForCompaction(
m->cfd, *(c->inputs()), &sfm_reserved_compact_space, log_buffer);
if (!enough_room) {
c->ReleaseCompactionFiles(status);
c.reset();
status = Status::CompactionTooLarge();
} else {
ROCKS_LOG_BUFFER(
log_buffer,
"[%s] Manual compaction from level-%d to level-%d from %s .. "
"%s; will stop at %s\n",
m->cfd->GetName().c_str(), m->input_level, c->output_level(),
(m->begin ? m->begin->DebugString(true).c_str() : "(begin)"),
(m->end ? m->end->DebugString(true).c_str() : "(end)"),
((m->done || m->manual_end == nullptr)
? "(end)"
: m->manual_end->DebugString(true).c_str()));
}
}
} else if (ShouldPickCompaction(is_prepicked, prepicked_compaction)) {
bool need_repick = is_prepicked && prepicked_compaction->need_repick;
if (HasExclusiveManualCompaction()) {
TEST_SYNC_POINT("DBImpl::BackgroundCompaction()::Conflict");
if (!need_repick) {
unscheduled_compactions_++;
return Status::OK();
}
}
ColumnFamilyData* cfd = nullptr;
if (!need_repick) {
cfd = PickCompactionFromQueue(&task_token, log_buffer);
if (cfd == nullptr) {
++unscheduled_compactions_;
return Status::Busy();
}
if (cfd->UnrefAndTryDelete()) {
return Status::OK();
}
} else {
cfd = c->column_family_data();
assert(cfd);
ResetBottomPriCompactionIntent(cfd, c);
assert(c == nullptr);
}
const auto& mutable_cf_options = cfd->GetLatestMutableCFOptions();
if (!mutable_cf_options.disable_auto_compactions && !cfd->IsDropped()) {
TEST_SYNC_POINT("DBImpl::BackgroundCompaction():BeforePickCompaction");
if (cfd->ioptions().compaction_style == kCompactionStyleUniversal &&
cfd->user_comparator()->timestamp_size() == 0) {
InitSnapshotContext(job_context);
assert(is_snapshot_supported_ || snapshots_.empty());
}
c.reset(cfd->PickCompaction(
mutable_cf_options, mutable_db_options_, job_context->snapshot_seqs,
job_context->snapshot_checker, log_buffer,
thread_pri == Env::Priority::BOTTOM ));
if (thread_pri == Env::Priority::LOW) {
TEST_SYNC_POINT("DBImpl::BackgroundCompaction():AfterPickCompaction");
} else if (thread_pri == Env::Priority::BOTTOM) {
TEST_SYNC_POINT_CALLBACK(
"DBImpl::BackgroundCompaction():AfterPickCompactionBottomPri",
c.get());
}
if (c != nullptr) {
bool enough_room = EnoughRoomForCompaction(
cfd, *(c->inputs()), &sfm_reserved_compact_space, log_buffer);
if (!enough_room) {
c->ReleaseCompactionFiles(status);
c->column_family_data()
->current()
->storage_info()
->ComputeCompactionScore(c->immutable_options(),
c->mutable_cf_options(),
cfd->GetFullHistoryTsLow());
EnqueuePendingCompaction(cfd);
c.reset();
status = Status::CompactionTooLarge();
} else {
size_t num_files = 0;
for (auto& each_level : *c->inputs()) {
num_files += each_level.files.size();
}
RecordInHistogram(stats_, NUM_FILES_IN_SINGLE_COMPACTION, num_files);
EnqueuePendingCompaction(cfd);
MaybeScheduleFlushOrCompaction();
}
} else if (is_prepicked) {
ROCKS_LOG_BUFFER(
log_buffer,
"[%s] Pre-picked compaction repicked files for compaction as "
"required, "
"but upon re-evaluation, no compaction was found necessary \n",
cfd->GetName().c_str());
}
}
}
IOStatus io_s;
bool compaction_released = false;
if (!c) {
ROCKS_LOG_BUFFER(log_buffer, "Compaction nothing to do");
} else if (c->deletion_compaction()) {
TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:BeforeCompaction",
c->column_family_data());
assert(c->num_input_files(1) == 0);
assert(c->column_family_data()->ioptions().compaction_style ==
kCompactionStyleFIFO);
compaction_job_stats.num_input_files = c->num_input_files(0);
NotifyOnCompactionBegin(c->column_family_data(), c.get(), status,
compaction_job_stats, job_context->job_id);
for (const auto& f : *c->inputs(0)) {
c->edit()->DeleteFile(c->level(), f->fd.GetNumber());
}
status = versions_->LogAndApply(
c->column_family_data(), read_options, write_options, c->edit(),
&mutex_, directories_.GetDbDir(),
false, nullptr,
[&c, &compaction_released](const Status& s) {
c->ReleaseCompactionFiles(s);
compaction_released = true;
});
io_s = versions_->io_status();
InstallSuperVersionAndScheduleWork(
c->column_family_data(), job_context->superversion_contexts.data());
ROCKS_LOG_BUFFER(log_buffer, "[%s] Deleted %d files\n",
c->column_family_data()->GetName().c_str(),
c->num_input_files(0));
if (status.ok() && io_s.ok()) {
UpdateFIFOCompactionStatus(c);
}
*made_progress = true;
TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:AfterCompaction",
c->column_family_data());
} else if (c->is_trivial_copy_compaction()) {
TEST_SYNC_POINT_CALLBACK(
"DBImpl::BackgroundCompaction:TriviaCopyBeforeCompaction",
c->column_family_data());
assert(c->num_input_files(1) == 0);
assert(c->column_family_data()->ioptions().compaction_style ==
kCompactionStyleFIFO);
assert(c->compaction_reason() == CompactionReason::kChangeTemperature);
compaction_job_stats.num_input_files = c->num_input_files(0);
NotifyOnCompactionBegin(c->column_family_data(), c.get(), status,
compaction_job_stats, job_context->job_id);
std::vector<FileMetaData> out_files;
for (const auto& in_file : *c->inputs(0)) {
const uint64_t out_file_number = versions_->NewFileNumber();
const std::string in_fname =
TableFileName(c->immutable_options().cf_paths,
in_file->fd.GetNumber(), in_file->fd.GetPathId());
const std::string out_fname =
TableFileName(c->immutable_options().cf_paths, out_file_number,
c->output_path_id());
int64_t tmp_current_time = 0;
auto get_time_status =
immutable_db_options_.clock->GetCurrentTime(&tmp_current_time);
if (!get_time_status.ok()) {
ROCKS_LOG_BUFFER(log_buffer,
"[%s] WARNING: Failed to get current time %s "
"status=%s",
c->column_family_data()->GetName().c_str(),
get_time_status.ToString().c_str());
}
uint64_t out_file_creation_time = static_cast<uint64_t>(tmp_current_time);
FileOptions copied_file_options = file_options_;
copied_file_options.temperature = c->GetOutputTemperature();
std::unique_ptr<WritableFileWriter> dest_writer;
{
std::unique_ptr<FSWritableFile> dest_file;
IOStatus writable_file_io_status =
immutable_db_options_.fs.get()->NewWritableFile(
out_fname, copied_file_options, &dest_file, nullptr );
TEST_SYNC_POINT_CALLBACK(
"NewWritableFile::FileOptions.temperature",
const_cast<Temperature*>(&copied_file_options.temperature));
if (!writable_file_io_status.ok()) {
io_s = writable_file_io_status;
ROCKS_LOG_BUFFER(
log_buffer,
"[%s] Error: Abort trivial copy compaction, failed to open "
"NewWritableFile %s\n"
" out_fname=%s, temperature=%s, io_status=%s",
c->column_family_data()->GetName().c_str(), out_fname.c_str(),
temperature_to_string[c->GetOutputTemperature()].c_str(),
io_s.ToString().c_str());
break;
}
FileTypeSet tmp_set = immutable_db_options_.checksum_handoff_file_types;
dest_writer.reset(new WritableFileWriter(
std::move(dest_file), out_fname, copied_file_options,
immutable_db_options_.clock, io_tracer_,
immutable_db_options_.stats, Histograms::SST_WRITE_MICROS,
c->immutable_options().listeners,
immutable_db_options_.file_checksum_gen_factory.get(),
tmp_set.Contains(FileType::kTableFile), false));
}
ROCKS_LOG_BUFFER(
log_buffer,
"[%s] Started copying from: %s\n"
" temperature=%s, to: %s, temperature=%s, buffer_size=%" PRIu64,
c->column_family_data()->GetName().c_str(), in_fname.c_str(),
temperature_to_string[in_file->temperature].c_str(),
out_fname.c_str(),
temperature_to_string[c->GetOutputTemperature()].c_str(),
c->mutable_cf_options()
.compaction_options_fifo.trivial_copy_buffer_size);
IOOptions copy_files_compaction_io_options;
copy_files_compaction_io_options.rate_limiter_priority =
Env::IOPriority::IO_LOW;
copy_files_compaction_io_options.type = IOType::kData;
copy_files_compaction_io_options.io_activity =
Env::IOActivity::kCompaction;
IOStatus copy_file_io_status = CopyFile(
immutable_db_options_.fs.get() ,
in_fname , in_file->temperature ,
dest_writer , 0 , true ,
io_tracer_ ,
c->mutable_cf_options()
.compaction_options_fifo
.trivial_copy_buffer_size
,
copy_files_compaction_io_options ,
copy_files_compaction_io_options );
if (dest_writer) {
IOOptions close_files_compaction_io_options;
close_files_compaction_io_options.rate_limiter_priority =
Env::IOPriority::IO_LOW;
close_files_compaction_io_options.type = IOType::kData;
close_files_compaction_io_options.io_activity =
Env::IOActivity::kCompaction;
io_s = dest_writer->Close(close_files_compaction_io_options);
if (!io_s.ok()) {
ROCKS_LOG_BUFFER(
log_buffer,
"[%s] Failed to close the writer. Failed to copy from: %s\n"
" temperature=%s, to=%s, temperature=%s, io_status=%s",
c->column_family_data()->GetName().c_str(), in_fname.c_str(),
temperature_to_string[in_file->temperature].c_str(),
out_fname.c_str(),
temperature_to_string[c->GetOutputTemperature()].c_str(),
io_s.ToString().c_str());
break;
}
}
io_s = copy_file_io_status;
if (!io_s.ok()) {
ROCKS_LOG_BUFFER(
log_buffer,
"[%s] Failed to copy from: %s\n"
" temperature=%s, to=%s, temperature=%s, io_status=%s",
c->column_family_data()->GetName().c_str(), in_fname.c_str(),
temperature_to_string[in_file->temperature].c_str(),
out_fname.c_str(),
temperature_to_string[c->GetOutputTemperature()].c_str(),
io_s.ToString().c_str());
break;
}
ROCKS_LOG_BUFFER(log_buffer,
"[%s] Successfully copying from: %s\n"
" temperature=%s, to=%s, temperature=%s, io_status=%s",
c->column_family_data()->GetName().c_str(),
in_fname.c_str(),
temperature_to_string[in_file->temperature].c_str(),
out_fname.c_str(),
temperature_to_string[c->GetOutputTemperature()].c_str(),
io_s.ToString().c_str());
FileMetaData out_file_metadata{
out_file_number,
c->output_path_id(),
in_file->fd.GetFileSize(),
in_file->smallest,
in_file->largest,
in_file->fd.smallest_seqno,
in_file->fd.largest_seqno,
false ,
c->GetOutputTemperature() ,
in_file->oldest_blob_file_number,
in_file->oldest_ancester_time,
out_file_creation_time,
c->MinInputFileEpochNumber(),
dest_writer->GetFileChecksum(),
dest_writer->GetFileChecksumFuncName(),
in_file->unique_id,
in_file->compensated_range_deletion_size,
in_file->tail_size,
in_file->user_defined_timestamps_persisted,
in_file->min_timestamp,
in_file->max_timestamp};
out_files.push_back(std::move(out_file_metadata));
}
if (status.ok() && io_s.ok()) {
assert(c->inputs(0)->size() == 1);
assert(out_files.size() == 1);
auto out_file_metadata_it = out_files.begin();
for (const auto& in_file : *c->inputs(0)) {
if (out_file_metadata_it == out_files.end()) {
break;
}
c->edit()->DeleteFile(c->level(), in_file->fd.GetNumber());
c->edit()->AddFile(c->level(), *out_file_metadata_it);
++out_file_metadata_it;
}
status = versions_->LogAndApply(
c->column_family_data(), read_options, write_options, c->edit(),
&mutex_, directories_.GetDbDir(),
false, nullptr,
[&c, &compaction_released](const Status& s) {
c->ReleaseCompactionFiles(s);
compaction_released = true;
});
}
if (io_s.ok()) {
io_s = versions_->io_status();
}
InstallSuperVersionAndScheduleWork(
c->column_family_data(), job_context->superversion_contexts.data());
if (status.ok() && io_s.ok()) {
UpdateFIFOCompactionStatus(c);
} else {
for (const auto& in_file : *c->inputs(0)) {
const std::string in_fname =
TableFileName(c->immutable_options().cf_paths,
in_file->fd.GetNumber(), in_file->fd.GetPathId());
ROCKS_LOG_BUFFER(
log_buffer,
"[%s] Failed to do trvial copy compaction: %s"
" temperature=%s, to temperature=%s, status=%s, io_status=%s",
c->column_family_data()->GetName().c_str(), in_fname.c_str(),
temperature_to_string[in_file->temperature].c_str(),
temperature_to_string[c->GetOutputTemperature()].c_str(),
status.ToString().c_str(), io_s.ToString().c_str());
}
}
*made_progress = true;
TEST_SYNC_POINT_CALLBACK(
"DBImpl::BackgroundCompaction:TriviaCopyAfterCompaction",
c->column_family_data());
} else if (!trivial_move_disallowed && c->IsTrivialMove()) {
TEST_SYNC_POINT("DBImpl::BackgroundCompaction:TrivialMove");
TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:BeforeCompaction",
c->column_family_data());
ThreadStatusUtil::SetColumnFamily(c->column_family_data());
ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION);
compaction_job_stats.num_input_files = c->num_input_files(0);
compaction_job_stats.is_remote_compaction = false;
compaction_job_stats.num_input_files_trivially_moved =
compaction_job_stats.num_input_files;
NotifyOnCompactionBegin(c->column_family_data(), c.get(), status,
compaction_job_stats, job_context->job_id);
if (c->compaction_reason() == CompactionReason::kLevelMaxLevelSize &&
c->immutable_options().compaction_pri == kRoundRobin) {
int start_level = c->start_level();
if (start_level > 0) {
auto vstorage = c->input_version()->storage_info();
c->edit()->AddCompactCursor(
start_level,
vstorage->GetNextCompactCursor(start_level, c->num_input_files(0)));
}
}
size_t moved_files = 0;
size_t moved_bytes = 0;
status = PerformTrivialMove(*c.get(), log_buffer, compaction_released,
moved_files, moved_bytes);
io_s = versions_->io_status();
InstallSuperVersionAndScheduleWork(
c->column_family_data(), job_context->superversion_contexts.data());
VersionStorageInfo::LevelSummaryStorage tmp;
c->column_family_data()->internal_stats()->IncBytesMoved(c->output_level(),
moved_bytes);
{
event_logger_.LogToBuffer(log_buffer)
<< "job" << job_context->job_id << "event" << "trivial_move"
<< "destination_level" << c->output_level() << "files" << moved_files
<< "total_files_size" << moved_bytes;
}
ROCKS_LOG_BUFFER(
log_buffer, "[%s] Moved #%d files to level-%zu %zu bytes %s: %s\n",
c->column_family_data()->GetName().c_str(), moved_files,
c->output_level(), moved_bytes, status.ToString().c_str(),
c->column_family_data()->current()->storage_info()->LevelSummary(&tmp));
*made_progress = true;
ThreadStatusUtil::ResetThreadStatus();
TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:AfterCompaction",
c->column_family_data());
} else if (!is_prepicked &&
Compaction::OutputToNonZeroMaxOutputLevel(
c->output_level(),
c->column_family_data()
->current()
->storage_info()
->MaxOutputLevel(
c->immutable_options().cf_allow_ingest_behind ||
immutable_db_options_.allow_ingest_behind)) &&
env_->GetBackgroundThreads(Env::Priority::BOTTOM) > 0) {
assert(thread_pri == Env::Priority::LOW);
TEST_SYNC_POINT("DBImpl::BackgroundCompaction:ForwardToBottomPriPool");
CompactionArg* ca = new CompactionArg;
ca->db = this;
ca->compaction_pri_ = Env::Priority::BOTTOM;
ca->prepicked_compaction = new PrepickedCompaction;
const bool need_repick =
c->mutable_cf_options()
.compaction_options_universal.reduce_file_locking;
if (need_repick) {
ca->prepicked_compaction->compaction =
CreateIntendedCompactionForwardedToBottomPriorityPool(c.get());
c.reset();
ca->prepicked_compaction->need_repick = true;
} else {
ca->prepicked_compaction->compaction = c.release();
ca->prepicked_compaction->need_repick = false;
}
ca->prepicked_compaction->manual_compaction_state = nullptr;
ca->prepicked_compaction->task_token = std::move(task_token);
++bg_bottom_compaction_scheduled_;
assert(c == nullptr);
env_->Schedule(&DBImpl::BGWorkBottomCompaction, ca, Env::Priority::BOTTOM,
this, &DBImpl::UnscheduleCompactionCallback);
} else {
TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:BeforeCompaction",
c->column_family_data());
int output_level __attribute__((__unused__));
output_level = c->output_level();
TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:NonTrivial",
&output_level);
InitSnapshotContext(job_context);
assert(is_snapshot_supported_ || snapshots_.empty());
CompactionJob compaction_job(
job_context->job_id, c.get(), immutable_db_options_,
mutable_db_options_, file_options_for_compaction_, versions_.get(),
&shutting_down_, log_buffer, directories_.GetDbDir(),
GetDataDir(c->column_family_data(), c->output_path_id()),
GetDataDir(c->column_family_data(), 0), stats_, &mutex_,
&error_handler_, job_context, table_cache_, &event_logger_,
c->mutable_cf_options().paranoid_file_checks,
c->mutable_cf_options().report_bg_io_stats, dbname_,
&compaction_job_stats, thread_pri, io_tracer_,
is_manual ? manual_compaction->canceled
: kManualCompactionCanceledFalse_,
compaction_aborted_, db_id_, db_session_id_,
c->column_family_data()->GetFullHistoryTsLow(), c->trim_ts(),
&blob_callback_, &bg_compaction_scheduled_,
&bg_bottom_compaction_scheduled_);
compaction_job.Prepare(std::nullopt );
std::unique_ptr<std::list<uint64_t>::iterator> min_options_file_number_elem;
if (immutable_db_options().compaction_service != nullptr) {
min_options_file_number_elem.reset(
new std::list<uint64_t>::iterator(CaptureOptionsFileNumber()));
}
NotifyOnCompactionBegin(c->column_family_data(), c.get(), status,
compaction_job_stats, job_context->job_id);
mutex_.Unlock();
if (thread_pri == Env::Priority::LOW) {
TEST_SYNC_POINT_CALLBACK(
"DBImpl::BackgroundCompaction:NonTrivial:BeforeRun", nullptr);
} else {
assert(thread_pri == Env::Priority::BOTTOM);
TEST_SYNC_POINT(
"DBImpl::BackgroundCompaction:NonTrivial:BeforeRunBottomPri");
}
compaction_job.Run().PermitUncheckedError();
TEST_SYNC_POINT("DBImpl::BackgroundCompaction:NonTrivial:AfterRun");
mutex_.Lock();
if (immutable_db_options().compaction_service != nullptr) {
ReleaseOptionsFileNumber(min_options_file_number_elem);
}
status = compaction_job.Install(&compaction_released);
io_s = compaction_job.io_status();
if (status.ok()) {
InstallSuperVersionAndScheduleWork(
c->column_family_data(), job_context->superversion_contexts.data());
}
*made_progress = true;
TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:AfterCompaction",
c->column_family_data());
}
if (status.ok() && !io_s.ok()) {
status = io_s;
} else {
io_s.PermitUncheckedError();
}
if (c != nullptr) {
if (!compaction_released) {
c->ReleaseCompactionFiles(status);
} else {
#ifndef NDEBUG
for (size_t i = 0; i < c->num_input_levels(); i++) {
for (size_t j = 0; j < c->inputs(i)->size(); j++) {
assert(!c->input(i, j)->being_compacted || !status.ok());
}
}
std::unordered_set<Compaction*>* cip = c->column_family_data()
->compaction_picker()
->compactions_in_progress();
assert(cip->find(c.get()) == cip->end());
#endif
}
*made_progress = true;
auto sfm = static_cast<SstFileManagerImpl*>(
immutable_db_options_.sst_file_manager.get());
if (sfm && sfm_reserved_compact_space) {
sfm->OnCompactionCompletion(c.get());
}
NotifyOnCompactionCompleted(c->column_family_data(), c.get(), status,
compaction_job_stats, job_context->job_id);
}
if (status.ok() || status.IsCompactionTooLarge() ||
status.IsManualCompactionPaused() || status.IsCompactionAborted()) {
} else if (status.IsColumnFamilyDropped() || status.IsShutdownInProgress()) {
} else {
ROCKS_LOG_WARN(immutable_db_options_.info_log, "Compaction error: %s",
status.ToString().c_str());
if (!io_s.ok()) {
auto err_reason = versions_->io_status().ok()
? BackgroundErrorReason::kCompaction
: BackgroundErrorReason::kManifestWrite;
error_handler_.SetBGError(io_s, err_reason);
} else {
error_handler_.SetBGError(status, BackgroundErrorReason::kCompaction);
}
if (c != nullptr && !is_manual && !error_handler_.IsBGWorkStopped()) {
auto cfd = c->column_family_data();
assert(cfd != nullptr);
c->column_family_data()
->current()
->storage_info()
->ComputeCompactionScore(c->immutable_options(),
c->mutable_cf_options(),
cfd->GetFullHistoryTsLow());
EnqueuePendingCompaction(cfd);
}
}
c.reset();
if (is_manual) {
ManualCompactionState* m = manual_compaction;
if (!status.ok()) {
m->status = status;
m->done = true;
}
if (m->manual_end == nullptr) {
m->done = true;
}
if (!m->done) {
assert(m->cfd->ioptions().compaction_style != kCompactionStyleUniversal ||
m->cfd->ioptions().num_levels > 1);
assert(m->cfd->ioptions().compaction_style != kCompactionStyleFIFO);
m->tmp_storage = *m->manual_end;
m->begin = &m->tmp_storage;
m->incomplete = true;
}
m->in_progress = false; }
TEST_SYNC_POINT("DBImpl::BackgroundCompaction:Finish");
return status;
}
Compaction* DBImpl::CreateIntendedCompactionForwardedToBottomPriorityPool(
Compaction* c) {
auto* cfd = c->column_family_data();
const auto& io = c->immutable_options();
const auto& mo = c->mutable_cf_options();
auto* vstorage = c->input_version()->storage_info();
std::vector<CompactionInputFiles> inputs(1);
const std::vector<FileMetaData*>* max_intput_level_files = nullptr;
int max_intput_level = 0;
for (size_t i = c->num_input_levels(); i >= 1; --i) {
size_t level = i - 1;
if (c->num_input_files(level) > 0) {
max_intput_level = static_cast<int>(level);
max_intput_level_files = c->inputs(level);
break;
}
}
assert(max_intput_level_files);
assert(!max_intput_level_files->empty());
inputs[0].level = max_intput_level;
if (max_intput_level == 0) {
inputs[0].files.push_back(
(*max_intput_level_files)[max_intput_level_files->size() - 1]);
} else {
for (FileMetaData* f : (*max_intput_level_files)) {
inputs[0].files.push_back(f);
}
}
c->ReleaseCompactionFiles(Status::OK());
Compaction* intended_compaction =
new Compaction(vstorage, io, mo, mutable_db_options_, std::move(inputs),
c->output_level(), c->target_output_file_size(),
c->max_compaction_bytes(), c->output_path_id(),
c->output_compression(), c->output_compression_opts(),
c->GetOutputTemperature(), c->max_subcompactions(),
c->grandparents(), std::nullopt ,
nullptr , c->compaction_reason());
cfd->compaction_picker()->RegisterCompaction(intended_compaction);
vstorage->ComputeCompactionScore(io, mo, cfd->GetFullHistoryTsLow());
intended_compaction->FinalizeInputInfo(cfd->current());
return intended_compaction;
}
bool DBImpl::HasPendingManualCompaction() {
return (!manual_compaction_dequeue_.empty());
}
void DBImpl::AddManualCompaction(DBImpl::ManualCompactionState* m) {
assert(manual_compaction_paused_ == 0);
manual_compaction_dequeue_.push_back(m);
}
void DBImpl::RemoveManualCompaction(DBImpl::ManualCompactionState* m) {
std::deque<ManualCompactionState*>::iterator it =
manual_compaction_dequeue_.begin();
while (it != manual_compaction_dequeue_.end()) {
if (m == (*it)) {
it = manual_compaction_dequeue_.erase(it);
return;
}
++it;
}
assert(false);
}
bool DBImpl::ShouldntRunManualCompaction(ManualCompactionState* m) {
if (m->exclusive) {
return (bg_bottom_compaction_scheduled_ > 0 ||
bg_compaction_scheduled_ > 0);
}
std::deque<ManualCompactionState*>::iterator it =
manual_compaction_dequeue_.begin();
bool seen = false;
while (it != manual_compaction_dequeue_.end()) {
if (m == (*it)) {
++it;
seen = true;
continue;
} else if (MCOverlap(m, (*it)) && (!seen && !(*it)->in_progress)) {
return true;
}
++it;
}
return false;
}
bool DBImpl::HaveManualCompaction(ColumnFamilyData* cfd) {
std::deque<ManualCompactionState*>::iterator it =
manual_compaction_dequeue_.begin();
while (it != manual_compaction_dequeue_.end()) {
if ((*it)->exclusive) {
return true;
}
if ((cfd == (*it)->cfd) && (!((*it)->in_progress || (*it)->done))) {
return true;
}
++it;
}
return false;
}
bool DBImpl::HasExclusiveManualCompaction() {
std::deque<ManualCompactionState*>::iterator it =
manual_compaction_dequeue_.begin();
while (it != manual_compaction_dequeue_.end()) {
if ((*it)->exclusive) {
return true;
}
++it;
}
return false;
}
bool DBImpl::MCOverlap(ManualCompactionState* m, ManualCompactionState* m1) {
if ((m->exclusive) || (m1->exclusive)) {
return true;
}
if (m->cfd != m1->cfd) {
return false;
}
return false;
}
void DBImpl::UpdateFIFOCompactionStatus(const std::unique_ptr<Compaction>& c) {
if (c == nullptr) {
return;
}
CompactionReason reason = c->compaction_reason();
switch (reason) {
case CompactionReason::kFIFOMaxSize:
RecordTick(stats_, FIFO_MAX_SIZE_COMPACTIONS);
break;
case CompactionReason::kFIFOTtl:
RecordTick(stats_, FIFO_TTL_COMPACTIONS);
break;
case CompactionReason::kChangeTemperature:
RecordTick(stats_, FIFO_CHANGE_TEMPERATURE_COMPACTIONS);
break;
default:
assert(false);
break;
}
}
void DBImpl::BuildCompactionJobInfo(
const ColumnFamilyData* cfd, Compaction* c, const Status& st,
const CompactionJobStats& compaction_job_stats, const int job_id,
CompactionJobInfo* compaction_job_info) const {
assert(compaction_job_info != nullptr);
compaction_job_info->cf_id = cfd->GetID();
compaction_job_info->cf_name = cfd->GetName();
compaction_job_info->status = st;
compaction_job_info->aborted = st.IsCompactionAborted();
compaction_job_info->thread_id = env_->GetThreadID();
compaction_job_info->job_id = job_id;
compaction_job_info->base_input_level = c->start_level();
compaction_job_info->output_level = c->output_level();
compaction_job_info->stats = compaction_job_stats;
const auto& input_table_properties = c->GetOrInitInputTableProperties();
const auto& output_table_properties = c->GetOutputTableProperties();
compaction_job_info->table_properties.insert(input_table_properties.begin(),
input_table_properties.end());
compaction_job_info->table_properties.insert(output_table_properties.begin(),
output_table_properties.end());
compaction_job_info->compaction_reason = c->compaction_reason();
compaction_job_info->compression = c->output_compression();
const ReadOptions read_options(Env::IOActivity::kCompaction);
for (size_t i = 0; i < c->num_input_levels(); ++i) {
for (const auto fmd : *c->inputs(i)) {
const FileDescriptor& desc = fmd->fd;
const uint64_t file_number = desc.GetNumber();
auto fn = TableFileName(c->immutable_options().cf_paths, file_number,
desc.GetPathId());
compaction_job_info->input_files.push_back(fn);
compaction_job_info->input_file_infos.push_back(CompactionFileInfo{
static_cast<int>(i), file_number, fmd->oldest_blob_file_number});
}
}
for (const auto& newf : c->edit()->GetNewFiles()) {
const FileMetaData& meta = newf.second;
const FileDescriptor& desc = meta.fd;
const uint64_t file_number = desc.GetNumber();
compaction_job_info->output_files.push_back(TableFileName(
c->immutable_options().cf_paths, file_number, desc.GetPathId()));
compaction_job_info->output_file_infos.push_back(CompactionFileInfo{
newf.first, file_number, meta.oldest_blob_file_number});
}
compaction_job_info->blob_compression_type =
c->mutable_cf_options().blob_compression_type;
for (const auto& blob_file : c->edit()->GetBlobFileAdditions()) {
BlobFileAdditionInfo blob_file_addition_info(
BlobFileName(c->immutable_options().cf_paths.front().path,
blob_file.GetBlobFileNumber()) ,
blob_file.GetBlobFileNumber(), blob_file.GetTotalBlobCount(),
blob_file.GetTotalBlobBytes());
compaction_job_info->blob_file_addition_infos.emplace_back(
std::move(blob_file_addition_info));
}
for (const auto& blob_file : c->edit()->GetBlobFileGarbages()) {
BlobFileGarbageInfo blob_file_garbage_info(
BlobFileName(c->immutable_options().cf_paths.front().path,
blob_file.GetBlobFileNumber()) ,
blob_file.GetBlobFileNumber(), blob_file.GetGarbageBlobCount(),
blob_file.GetGarbageBlobBytes());
compaction_job_info->blob_file_garbage_infos.emplace_back(
std::move(blob_file_garbage_info));
}
}
void DBImpl::InstallSuperVersionAndScheduleWork(
ColumnFamilyData* cfd, SuperVersionContext* sv_context,
std::optional<std::shared_ptr<SeqnoToTimeMapping>>
new_seqno_to_time_mapping) {
mutex_.AssertHeld();
size_t old_memtable_size = 0;
auto* old_sv = cfd->GetSuperVersion();
if (old_sv) {
old_memtable_size = old_sv->mutable_cf_options.write_buffer_size *
old_sv->mutable_cf_options.max_write_buffer_number;
}
if (UNLIKELY(sv_context->new_superversion == nullptr)) {
sv_context->NewSuperVersion();
}
cfd->InstallSuperVersion(sv_context, &mutex_,
std::move(new_seqno_to_time_mapping));
bottommost_files_mark_threshold_ = kMaxSequenceNumber;
standalone_range_deletion_files_mark_threshold_ = kMaxSequenceNumber;
for (auto* my_cfd : *versions_->GetColumnFamilySet()) {
if (!my_cfd->AllowIngestBehind()) {
bottommost_files_mark_threshold_ = std::min(
bottommost_files_mark_threshold_,
my_cfd->current()->storage_info()->bottommost_files_mark_threshold());
}
standalone_range_deletion_files_mark_threshold_ =
std::min(standalone_range_deletion_files_mark_threshold_,
cfd->current()
->storage_info()
->standalone_range_tombstone_files_mark_threshold());
}
EnqueuePendingCompaction(cfd);
MaybeScheduleFlushOrCompaction();
max_total_in_memory_state_ =
max_total_in_memory_state_ - old_memtable_size +
cfd->GetLatestMutableCFOptions().write_buffer_size *
cfd->GetLatestMutableCFOptions().max_write_buffer_number;
}
bool DBImpl::ShouldPurge(uint64_t file_number) const {
return files_grabbed_for_purge_.find(file_number) ==
files_grabbed_for_purge_.end() &&
purge_files_.find(file_number) == purge_files_.end();
}
void DBImpl::MarkAsGrabbedForPurge(uint64_t file_number) {
files_grabbed_for_purge_.insert(file_number);
}
void DBImpl::SetSnapshotChecker(SnapshotChecker* snapshot_checker) {
InstrumentedMutexLock l(&mutex_);
assert(!snapshot_checker_);
snapshot_checker_.reset(snapshot_checker);
}
void DBImpl::InitSnapshotContext(JobContext* job_context) {
mutex_.AssertHeld();
assert(job_context != nullptr);
if (job_context->snapshot_context_initialized) {
return;
}
SnapshotChecker* snapshot_checker = snapshot_checker_.get();
if (use_custom_gc_ && !snapshot_checker) {
snapshot_checker = DisableGCSnapshotChecker::Instance();
}
std::unique_ptr<ManagedSnapshot> managed_snapshot = nullptr;
if (snapshot_checker) {
const Snapshot* snapshot =
GetSnapshotImpl(false, false);
managed_snapshot.reset(new ManagedSnapshot(this, snapshot));
}
SequenceNumber earliest_write_conflict_snapshot = kMaxSequenceNumber;
std::vector<SequenceNumber> snapshot_seqs =
snapshots_.GetAll(&earliest_write_conflict_snapshot);
job_context->InitSnapshotContext(
snapshot_checker, std::move(managed_snapshot),
earliest_write_conflict_snapshot, std::move(snapshot_seqs));
}
Status DBImpl::WaitForCompact(
const WaitForCompactOptions& wait_for_compact_options) {
InstrumentedMutexLock l(&mutex_);
if (wait_for_compact_options.flush) {
Status s = DBImpl::FlushAllColumnFamilies(FlushOptions(),
FlushReason::kManualFlush);
if (!s.ok()) {
return s;
}
} else if (wait_for_compact_options.close_db &&
has_unpersisted_data_.load(std::memory_order_relaxed) &&
!mutable_db_options_.avoid_flush_during_shutdown) {
Status s =
DBImpl::FlushAllColumnFamilies(FlushOptions(), FlushReason::kShutDown);
if (!s.ok()) {
return s;
}
}
TEST_SYNC_POINT("DBImpl::WaitForCompact:StartWaiting");
const auto deadline = immutable_db_options_.clock->NowMicros() +
wait_for_compact_options.timeout.count();
for (;;) {
if (shutting_down_.load(std::memory_order_acquire)) {
return Status::ShutdownInProgress();
}
if (bg_work_paused_ && wait_for_compact_options.abort_on_pause) {
return Status::Aborted();
}
if ((bg_bottom_compaction_scheduled_ || bg_compaction_scheduled_ ||
bg_flush_scheduled_ || unscheduled_compactions_ ||
(wait_for_compact_options.wait_for_purge && bg_purge_scheduled_) ||
unscheduled_flushes_ || error_handler_.IsRecoveryInProgress()) &&
(error_handler_.GetBGError().ok())) {
if (wait_for_compact_options.timeout.count()) {
if (bg_cv_.TimedWait(deadline)) {
return Status::TimedOut();
}
} else {
TEST_SYNC_POINT("DBImpl::WaitForCompact:InsideLoop");
bg_cv_.Wait();
}
} else if (wait_for_compact_options.close_db) {
reject_new_background_jobs_ = true;
mutex_.Unlock();
Status s = Close();
mutex_.Lock();
if (!s.ok()) {
reject_new_background_jobs_ = false;
}
return s;
} else {
return error_handler_.GetBGError();
}
}
}
bool DBImpl::ShouldPickCompaction(
bool is_prepicked, const PrepickedCompaction* prepicked_compaction) {
return (!is_prepicked && !compaction_queue_.empty()) ||
(is_prepicked && prepicked_compaction->need_repick);
}
void DBImpl::ResetBottomPriCompactionIntent(ColumnFamilyData* cfd,
std::unique_ptr<Compaction>& c) {
c->ReleaseCompactionFiles(Status::OK());
cfd->current()->storage_info()->ComputeCompactionScore(
c->immutable_options(), c->mutable_cf_options(),
cfd->GetFullHistoryTsLow());
c.reset();
}
}