#include "db/db_impl/db_impl_secondary.h"
#include <cinttypes>
#include "db/arena_wrapped_db_iter.h"
#include "db/log_reader.h"
#include "db/log_writer.h"
#include "db/merge_context.h"
#include "db/version_edit.h"
#include "file/filename.h"
#include "file/writable_file_writer.h"
#include "logging/auto_roll_logger.h"
#include "logging/logging.h"
#include "monitoring/perf_context_imp.h"
#include "rocksdb/convenience.h"
#include "rocksdb/utilities/options_util.h"
#include "util/cast_util.h"
#include "util/write_batch_util.h"
namespace ROCKSDB_NAMESPACE {
DBImplSecondary::DBImplSecondary(const DBOptions& db_options,
const std::string& dbname,
std::string secondary_path)
: DBImpl(db_options, dbname, false, true, true),
secondary_path_(std::move(secondary_path)) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Opening the db in secondary mode");
LogFlush(immutable_db_options_.info_log);
}
DBImplSecondary::~DBImplSecondary() = default;
Status DBImplSecondary::Recover(
const std::vector<ColumnFamilyDescriptor>& column_families,
bool , bool ,
bool , bool , uint64_t*,
RecoveryContext* , bool* ) {
mutex_.AssertHeld();
JobContext job_context(0);
Status s;
s = static_cast<ReactiveVersionSet*>(versions_.get())
->Recover(column_families, &manifest_reader_, &manifest_reporter_,
&manifest_reader_status_);
if (!s.ok()) {
if (manifest_reader_status_) {
manifest_reader_status_->PermitUncheckedError();
}
return s;
}
max_total_in_memory_state_ = 0;
for (auto cfd : *versions_->GetColumnFamilySet()) {
const auto& mutable_cf_options = cfd->GetLatestMutableCFOptions();
max_total_in_memory_state_ += mutable_cf_options.write_buffer_size *
mutable_cf_options.max_write_buffer_number;
}
if (s.ok()) {
default_cf_handle_ = new ColumnFamilyHandleImpl(
versions_->GetColumnFamilySet()->GetDefault(), this, &mutex_);
default_cf_internal_stats_ = default_cf_handle_->cfd()->internal_stats();
std::unordered_set<ColumnFamilyData*> cfds_changed;
s = FindAndRecoverLogFiles(&cfds_changed, &job_context);
}
if (s.IsPathNotFound()) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Secondary tries to read WAL, but WAL file(s) have already "
"been purged by primary.");
s = Status::OK();
}
job_context.Clean();
return s;
}
Status DBImplSecondary::FindAndRecoverLogFiles(
std::unordered_set<ColumnFamilyData*>* cfds_changed,
JobContext* job_context) {
assert(nullptr != cfds_changed);
assert(nullptr != job_context);
Status s;
std::vector<uint64_t> logs;
s = FindNewLogNumbers(&logs);
if (s.ok() && !logs.empty()) {
SequenceNumber next_sequence(kMaxSequenceNumber);
s = RecoverLogFiles(logs, &next_sequence, cfds_changed, job_context);
}
return s;
}
Status DBImplSecondary::FindNewLogNumbers(std::vector<uint64_t>* logs) {
assert(logs != nullptr);
std::vector<std::string> filenames;
Status s;
IOOptions io_opts;
io_opts.do_not_recurse = true;
s = immutable_db_options_.fs->GetChildren(immutable_db_options_.GetWalDir(),
io_opts, &filenames,
nullptr);
if (s.IsNotFound()) {
return Status::InvalidArgument("Failed to open wal_dir",
immutable_db_options_.GetWalDir());
} else if (!s.ok()) {
return s;
}
uint64_t log_number_min = 0;
if (!log_readers_.empty()) {
log_number_min = log_readers_.begin()->first;
}
for (size_t i = 0; i < filenames.size(); i++) {
uint64_t number;
FileType type;
if (ParseFileName(filenames[i], &number, &type) && type == kWalFile &&
number >= log_number_min) {
logs->push_back(number);
}
}
if (!logs->empty()) {
std::sort(logs->begin(), logs->end());
}
return s;
}
Status DBImplSecondary::MaybeInitLogReader(
uint64_t log_number, log::FragmentBufferedReader** log_reader) {
auto iter = log_readers_.find(log_number);
if (iter == log_readers_.end() ||
iter->second->reader_->GetLogNumber() != log_number) {
if (iter != log_readers_.end()) {
log_readers_.erase(iter);
}
std::string fname =
LogFileName(immutable_db_options_.GetWalDir(), log_number);
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Recovering log #%" PRIu64 " mode %d", log_number,
static_cast<int>(immutable_db_options_.wal_recovery_mode));
std::unique_ptr<SequentialFileReader> file_reader;
{
std::unique_ptr<FSSequentialFile> file;
Status status = fs_->NewSequentialFile(
fname, fs_->OptimizeForLogRead(file_options_), &file, nullptr);
if (!status.ok()) {
*log_reader = nullptr;
return status;
}
file_reader.reset(new SequentialFileReader(
std::move(file), fname, immutable_db_options_.log_readahead_size,
io_tracer_));
}
LogReaderContainer* log_reader_container = new LogReaderContainer(
env_, immutable_db_options_.info_log, std::move(fname),
std::move(file_reader), log_number);
log_readers_.insert(std::make_pair(
log_number, std::unique_ptr<LogReaderContainer>(log_reader_container)));
}
iter = log_readers_.find(log_number);
assert(iter != log_readers_.end());
*log_reader = iter->second->reader_;
return Status::OK();
}
Status DBImplSecondary::RecoverLogFiles(
const std::vector<uint64_t>& log_numbers, SequenceNumber* next_sequence,
std::unordered_set<ColumnFamilyData*>* cfds_changed,
JobContext* job_context) {
assert(nullptr != cfds_changed);
assert(nullptr != job_context);
mutex_.AssertHeld();
Status status;
for (auto log_number : log_numbers) {
log::FragmentBufferedReader* reader = nullptr;
status = MaybeInitLogReader(log_number, &reader);
if (!status.ok()) {
return status;
}
assert(reader != nullptr);
}
const UnorderedMap<uint32_t, size_t>& running_ts_sz =
versions_->GetRunningColumnFamiliesTimestampSize();
for (auto log_number : log_numbers) {
auto it = log_readers_.find(log_number);
assert(it != log_readers_.end());
log::FragmentBufferedReader* reader = it->second->reader_;
Status* wal_read_status = it->second->status_;
assert(wal_read_status);
versions_->MarkFileNumberUsed(log_number);
std::string scratch;
Slice record;
WriteBatch batch;
while (reader->ReadRecord(&record, &scratch,
immutable_db_options_.wal_recovery_mode) &&
wal_read_status->ok() && status.ok()) {
if (record.size() < WriteBatchInternal::kHeader) {
reader->GetReporter()->Corruption(
record.size(), Status::Corruption("log record too small"));
continue;
}
status = WriteBatchInternal::SetContents(&batch, record);
if (!status.ok()) {
break;
}
const UnorderedMap<uint32_t, size_t>& record_ts_sz =
reader->GetRecordedTimestampSize();
status = HandleWriteBatchTimestampSizeDifference(
&batch, running_ts_sz, record_ts_sz,
TimestampSizeConsistencyMode::kVerifyConsistency, seq_per_batch_,
batch_per_txn_);
if (!status.ok()) {
break;
}
SequenceNumber seq_of_batch = WriteBatchInternal::Sequence(&batch);
std::vector<uint32_t> column_family_ids;
status = CollectColumnFamilyIdsFromWriteBatch(batch, &column_family_ids);
if (status.ok()) {
for (const auto id : column_family_ids) {
ColumnFamilyData* cfd =
versions_->GetColumnFamilySet()->GetColumnFamily(id);
if (cfd == nullptr) {
continue;
}
cfds_changed->insert(cfd);
const std::vector<FileMetaData*>& l0_files =
cfd->current()->storage_info()->LevelFiles(0);
SequenceNumber seq =
l0_files.empty() ? 0 : l0_files.back()->fd.largest_seqno;
if (seq_of_batch <= seq) {
continue;
}
auto curr_log_num = std::numeric_limits<uint64_t>::max();
if (cfd_to_current_log_.count(cfd) > 0) {
curr_log_num = cfd_to_current_log_[cfd];
}
if (!cfd->mem()->IsEmpty() &&
(curr_log_num == std::numeric_limits<uint64_t>::max() ||
curr_log_num != log_number)) {
MemTable* new_mem = cfd->ConstructNewMemtable(
cfd->GetLatestMutableCFOptions(), seq_of_batch);
cfd->mem()->SetNextLogNumber(log_number);
cfd->mem()->ConstructFragmentedRangeTombstones();
cfd->imm()->Add(cfd->mem(), &job_context->memtables_to_free);
new_mem->Ref();
cfd->SetMemtable(new_mem);
}
}
bool has_valid_writes = false;
status = WriteBatchInternal::InsertInto(
&batch, column_family_memtables_.get(),
nullptr , nullptr ,
true, log_number, this, false ,
next_sequence, &has_valid_writes, seq_per_batch_, batch_per_txn_);
}
if (status.ok()) {
for (const auto id : column_family_ids) {
ColumnFamilyData* cfd =
versions_->GetColumnFamilySet()->GetColumnFamily(id);
if (cfd == nullptr) {
continue;
}
std::unordered_map<ColumnFamilyData*, uint64_t>::iterator iter =
cfd_to_current_log_.find(cfd);
if (iter == cfd_to_current_log_.end()) {
cfd_to_current_log_.insert({cfd, log_number});
} else if (log_number > iter->second) {
iter->second = log_number;
}
}
auto last_sequence = *next_sequence - 1;
if ((*next_sequence != kMaxSequenceNumber) &&
(versions_->LastSequence() <= last_sequence)) {
versions_->SetLastAllocatedSequence(last_sequence);
versions_->SetLastPublishedSequence(last_sequence);
versions_->SetLastSequence(last_sequence);
}
} else {
reader->GetReporter()->Corruption(record.size(), status);
}
}
if (status.ok() && !wal_read_status->ok()) {
status = *wal_read_status;
}
if (!status.ok()) {
wal_read_status->PermitUncheckedError();
return status;
}
}
if (log_readers_.size() > 1) {
auto erase_iter = log_readers_.begin();
std::advance(erase_iter, log_readers_.size() - 1);
log_readers_.erase(log_readers_.begin(), erase_iter);
}
return status;
}
Status DBImplSecondary::GetImpl(const ReadOptions& read_options,
const Slice& key,
GetImplOptions& get_impl_options) {
assert(get_impl_options.value != nullptr ||
get_impl_options.columns != nullptr ||
get_impl_options.merge_operands != nullptr);
assert(get_impl_options.column_family);
Status s;
if (read_options.timestamp) {
s = FailIfTsMismatchCf(get_impl_options.column_family,
*(read_options.timestamp));
if (!s.ok()) {
return s;
}
} else {
s = FailIfCfHasTs(get_impl_options.column_family);
if (!s.ok()) {
return s;
}
}
if (get_impl_options.timestamp) {
get_impl_options.timestamp->clear();
}
PERF_CPU_TIMER_GUARD(get_cpu_nanos, immutable_db_options_.clock);
StopWatch sw(immutable_db_options_.clock, stats_, DB_GET);
PERF_TIMER_GUARD(get_snapshot_time);
const Comparator* ucmp = get_impl_options.column_family->GetComparator();
assert(ucmp);
std::string* ts =
ucmp->timestamp_size() > 0 ? get_impl_options.timestamp : nullptr;
SequenceNumber snapshot = versions_->LastSequence();
GetWithTimestampReadCallback read_cb(snapshot);
auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(
get_impl_options.column_family);
auto cfd = cfh->cfd();
if (tracer_) {
InstrumentedMutexLock lock(&trace_mutex_);
if (tracer_) {
tracer_->Get(get_impl_options.column_family, key);
}
}
SuperVersion* super_version = GetAndRefSuperVersion(cfd);
if (read_options.timestamp && read_options.timestamp->size() > 0) {
s = FailIfReadCollapsedHistory(cfd, super_version,
*(read_options.timestamp));
if (!s.ok()) {
ReturnAndCleanupSuperVersion(cfd, super_version);
return s;
}
}
MergeContext merge_context;
SequenceNumber max_covering_tombstone_seq = 0;
LookupKey lkey(key, snapshot, read_options.timestamp);
PERF_TIMER_STOP(get_snapshot_time);
bool done = false;
if (get_impl_options.get_value) {
if (super_version->mem->Get(
lkey,
get_impl_options.value ? get_impl_options.value->GetSelf()
: nullptr,
get_impl_options.columns, ts, &s, &merge_context,
&max_covering_tombstone_seq, read_options,
false , &read_cb,
nullptr, true)) {
done = true;
if (get_impl_options.value) {
get_impl_options.value->PinSelf();
}
RecordTick(stats_, MEMTABLE_HIT);
} else if ((s.ok() || s.IsMergeInProgress()) &&
super_version->imm->Get(
lkey,
get_impl_options.value ? get_impl_options.value->GetSelf()
: nullptr,
get_impl_options.columns, ts, &s, &merge_context,
&max_covering_tombstone_seq, read_options, &read_cb)) {
done = true;
if (get_impl_options.value) {
get_impl_options.value->PinSelf();
}
RecordTick(stats_, MEMTABLE_HIT);
}
} else {
if (super_version->mem->Get(
lkey,
get_impl_options.value ? get_impl_options.value->GetSelf()
: nullptr,
get_impl_options.columns, ts, &s, &merge_context,
&max_covering_tombstone_seq, read_options,
false , &read_cb,
nullptr, false)) {
done = true;
RecordTick(stats_, MEMTABLE_HIT);
} else if ((s.ok() || s.IsMergeInProgress()) &&
super_version->imm->GetMergeOperands(lkey, &s, &merge_context,
&max_covering_tombstone_seq,
read_options)) {
done = true;
RecordTick(stats_, MEMTABLE_HIT);
}
}
if (!s.ok() && !s.IsMergeInProgress() && !s.IsNotFound()) {
assert(done);
ReturnAndCleanupSuperVersion(cfd, super_version);
return s;
}
if (!done) {
PERF_TIMER_GUARD(get_from_output_files_time);
PinnedIteratorsManager pinned_iters_mgr;
super_version->current->Get(
read_options, lkey, get_impl_options.value, get_impl_options.columns,
ts, &s, &merge_context, &max_covering_tombstone_seq, &pinned_iters_mgr,
nullptr,
nullptr, nullptr, &read_cb, nullptr,
true);
RecordTick(stats_, MEMTABLE_MISS);
}
{
PERF_TIMER_GUARD(get_post_process_time);
ReturnAndCleanupSuperVersion(cfd, super_version);
RecordTick(stats_, NUMBER_KEYS_READ);
size_t size = 0;
if (get_impl_options.value) {
size = get_impl_options.value->size();
} else if (get_impl_options.columns) {
size = get_impl_options.columns->serialized_size();
} else if (get_impl_options.merge_operands) {
*get_impl_options.number_of_operands =
static_cast<int>(merge_context.GetNumOperands());
for (const Slice& sl : merge_context.GetOperands()) {
size += sl.size();
get_impl_options.merge_operands->PinSelf(sl);
get_impl_options.merge_operands++;
}
}
RecordTick(stats_, BYTES_READ, size);
RecordTimeToHistogram(stats_, BYTES_PER_READ, size);
PERF_COUNTER_ADD(get_read_bytes, size);
}
return s;
}
Iterator* DBImplSecondary::NewIterator(const ReadOptions& _read_options,
ColumnFamilyHandle* column_family) {
if (_read_options.io_activity != Env::IOActivity::kUnknown &&
_read_options.io_activity != Env::IOActivity::kDBIterator) {
return NewErrorIterator(Status::InvalidArgument(
"Can only call NewIterator with `ReadOptions::io_activity` is "
"`Env::IOActivity::kUnknown` or `Env::IOActivity::kDBIterator`"));
}
ReadOptions read_options(_read_options);
if (read_options.io_activity == Env::IOActivity::kUnknown) {
read_options.io_activity = Env::IOActivity::kDBIterator;
}
if (read_options.read_tier == kPersistedTier) {
return NewErrorIterator(Status::NotSupported(
"ReadTier::kPersistedData is not yet supported in iterators."));
}
assert(column_family);
if (read_options.timestamp) {
const Status s =
FailIfTsMismatchCf(column_family, *(read_options.timestamp));
if (!s.ok()) {
return NewErrorIterator(s);
}
} else {
const Status s = FailIfCfHasTs(column_family);
if (!s.ok()) {
return NewErrorIterator(s);
}
}
Iterator* result = nullptr;
auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
assert(cfh != nullptr);
auto cfd = cfh->cfd();
if (read_options.tailing) {
return NewErrorIterator(Status::NotSupported(
"tailing iterator not supported in secondary mode"));
} else if (read_options.snapshot != nullptr) {
return NewErrorIterator(
Status::NotSupported("snapshot not supported in secondary mode"));
} else {
SequenceNumber snapshot(kMaxSequenceNumber);
SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
if (read_options.timestamp && read_options.timestamp->size() > 0) {
const Status s =
FailIfReadCollapsedHistory(cfd, sv, *(read_options.timestamp));
if (!s.ok()) {
CleanupSuperVersion(sv);
return NewErrorIterator(s);
}
}
result = NewIteratorImpl(read_options, cfh, sv, snapshot,
nullptr );
}
return result;
}
ArenaWrappedDBIter* DBImplSecondary::NewIteratorImpl(
const ReadOptions& read_options, ColumnFamilyHandleImpl* cfh,
SuperVersion* super_version, SequenceNumber snapshot,
ReadCallback* read_callback, bool expose_blob_index, bool allow_refresh) {
assert(nullptr != cfh);
assert(snapshot == kMaxSequenceNumber);
snapshot = versions_->LastSequence();
assert(snapshot != kMaxSequenceNumber);
return NewArenaWrappedDbIterator(env_, read_options, cfh, super_version,
snapshot, read_callback, this,
expose_blob_index, allow_refresh,
false);
}
Status DBImplSecondary::NewIterators(
const ReadOptions& _read_options,
const std::vector<ColumnFamilyHandle*>& column_families,
std::vector<Iterator*>* iterators) {
if (_read_options.io_activity != Env::IOActivity::kUnknown &&
_read_options.io_activity != Env::IOActivity::kDBIterator) {
return Status::InvalidArgument(
"Can only call NewIterators with `ReadOptions::io_activity` is "
"`Env::IOActivity::kUnknown` or `Env::IOActivity::kDBIterator`");
}
ReadOptions read_options(_read_options);
if (read_options.io_activity == Env::IOActivity::kUnknown) {
read_options.io_activity = Env::IOActivity::kDBIterator;
}
if (read_options.read_tier == kPersistedTier) {
return Status::NotSupported(
"ReadTier::kPersistedData is not yet supported in iterators.");
}
ReadCallback* read_callback = nullptr; if (iterators == nullptr) {
return Status::InvalidArgument("iterators not allowed to be nullptr");
}
if (read_options.timestamp) {
for (auto* cf : column_families) {
assert(cf);
const Status s = FailIfTsMismatchCf(cf, *(read_options.timestamp));
if (!s.ok()) {
return s;
}
}
} else {
for (auto* cf : column_families) {
assert(cf);
const Status s = FailIfCfHasTs(cf);
if (!s.ok()) {
return s;
}
}
}
iterators->clear();
iterators->reserve(column_families.size());
if (read_options.tailing) {
return Status::NotSupported(
"tailing iterator not supported in secondary mode");
} else if (read_options.snapshot != nullptr) {
return Status::NotSupported("snapshot not supported in secondary mode");
} else {
SequenceNumber read_seq(kMaxSequenceNumber);
autovector<std::tuple<ColumnFamilyHandleImpl*, SuperVersion*>> cfh_to_sv;
const bool check_read_ts =
read_options.timestamp && read_options.timestamp->size() > 0;
for (auto cf : column_families) {
auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(cf);
auto cfd = cfh->cfd();
SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
cfh_to_sv.emplace_back(cfh, sv);
if (check_read_ts) {
const Status s =
FailIfReadCollapsedHistory(cfd, sv, *(read_options.timestamp));
if (!s.ok()) {
for (auto prev_entry : cfh_to_sv) {
CleanupSuperVersion(std::get<1>(prev_entry));
}
return s;
}
}
}
assert(cfh_to_sv.size() == column_families.size());
for (auto [cfh, sv] : cfh_to_sv) {
iterators->push_back(
NewIteratorImpl(read_options, cfh, sv, read_seq, read_callback));
}
}
return Status::OK();
}
Status DBImplSecondary::TryCatchUpWithPrimary() {
assert(versions_.get() != nullptr);
Status s;
std::unordered_set<ColumnFamilyData*> cfds_changed;
JobContext job_context(0, true );
{
InstrumentedMutexLock lock_guard(&mutex_);
assert(manifest_reader_.get() != nullptr);
s = static_cast_with_check<ReactiveVersionSet>(versions_.get())
->ReadAndApply(&mutex_, &manifest_reader_,
manifest_reader_status_.get(), &cfds_changed,
nullptr);
ROCKS_LOG_INFO(immutable_db_options_.info_log, "Last sequence is %" PRIu64,
static_cast<uint64_t>(versions_->LastSequence()));
for (ColumnFamilyData* cfd : cfds_changed) {
if (cfd->IsDropped()) {
ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "[%s] is dropped\n",
cfd->GetName().c_str());
continue;
}
VersionStorageInfo::LevelSummaryStorage tmp;
ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
"[%s] Level summary: %s\n", cfd->GetName().c_str(),
cfd->current()->storage_info()->LevelSummary(&tmp));
}
if (s.ok()) {
s = FindAndRecoverLogFiles(&cfds_changed, &job_context);
if (s.IsPathNotFound()) {
ROCKS_LOG_INFO(
immutable_db_options_.info_log,
"Secondary tries to read WAL, but WAL file(s) have already "
"been purged by primary.");
s = Status::OK();
}
}
if (s.ok()) {
for (auto cfd : cfds_changed) {
cfd->imm()->RemoveOldMemTables(cfd->GetLogNumber(),
&job_context.memtables_to_free);
auto& sv_context = job_context.superversion_contexts.back();
cfd->InstallSuperVersion(&sv_context, &mutex_);
sv_context.NewSuperVersion();
}
}
}
job_context.Clean();
JobContext purge_files_job_context(0);
{
InstrumentedMutexLock lock_guard(&mutex_);
FindObsoleteFiles(&purge_files_job_context, false);
}
if (purge_files_job_context.HaveSomethingToDelete()) {
PurgeObsoleteFiles(purge_files_job_context);
}
purge_files_job_context.Clean();
return s;
}
Status DB::OpenAsSecondary(const Options& options, const std::string& dbname,
const std::string& secondary_path,
std::unique_ptr<DB>* dbptr) {
*dbptr = nullptr;
DBOptions db_options(options);
ColumnFamilyOptions cf_options(options);
std::vector<ColumnFamilyDescriptor> column_families;
column_families.emplace_back(kDefaultColumnFamilyName, cf_options);
std::vector<ColumnFamilyHandle*> handles;
Status s = DB::OpenAsSecondary(db_options, dbname, secondary_path,
column_families, &handles, dbptr);
if (s.ok()) {
assert(handles.size() == 1);
delete handles[0];
}
return s;
}
Status DB::OpenAsSecondary(
const DBOptions& db_options, const std::string& dbname,
const std::string& secondary_path,
const std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle*>* handles, std::unique_ptr<DB>* dbptr) {
*dbptr = nullptr;
DBOptions tmp_opts(db_options);
Status s;
if (nullptr == tmp_opts.info_log) {
s = CreateLoggerFromOptions(secondary_path, tmp_opts, &tmp_opts.info_log);
if (!s.ok()) {
tmp_opts.info_log = nullptr;
return s;
}
}
assert(tmp_opts.info_log != nullptr);
if (db_options.max_open_files != -1) {
std::ostringstream oss;
oss << "The primary instance may delete all types of files after they "
"become obsolete. The application can coordinate the primary and "
"secondary so that primary does not delete/rename files that are "
"currently being used by the secondary. Alternatively, a custom "
"Env/FS can be provided such that files become inaccessible only "
"after all primary and secondaries indicate that they are obsolete "
"and deleted. If the above two are not possible, you can open the "
"secondary instance with `max_open_files==-1` so that secondary "
"will eagerly keep all table files open. Even if a file is deleted, "
"its content can still be accessed via a prior open file "
"descriptor. This is a hacky workaround for only table files. If "
"none of the above is done, then point lookup or "
"range scan via the secondary instance can result in IOError: file "
"not found. This can be resolved by retrying "
"TryCatchUpWithPrimary().";
ROCKS_LOG_WARN(tmp_opts.info_log, "%s", oss.str().c_str());
}
handles->clear();
DBImplSecondary* impl = new DBImplSecondary(tmp_opts, dbname, secondary_path);
impl->versions_.reset(new ReactiveVersionSet(
dbname, &impl->immutable_db_options_, impl->mutable_db_options_,
impl->file_options_, impl->table_cache_.get(),
impl->write_buffer_manager_, &impl->write_controller_, impl->io_tracer_));
impl->column_family_memtables_.reset(
new ColumnFamilyMemTablesImpl(impl->versions_->GetColumnFamilySet()));
impl->wal_in_db_path_ = impl->immutable_db_options_.IsWalDirSameAsDBPath();
impl->mutex_.Lock();
s = impl->Recover(column_families, true, false, false);
if (s.ok()) {
for (const auto& cf : column_families) {
auto cfd =
impl->versions_->GetColumnFamilySet()->GetColumnFamily(cf.name);
if (nullptr == cfd) {
s = Status::InvalidArgument("Column family not found", cf.name);
break;
}
handles->push_back(new ColumnFamilyHandleImpl(cfd, impl, &impl->mutex_));
}
}
SuperVersionContext sv_context(true );
if (s.ok()) {
for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
sv_context.NewSuperVersion();
cfd->InstallSuperVersion(&sv_context, &impl->mutex_);
}
}
impl->mutex_.Unlock();
sv_context.Clean();
if (s.ok()) {
dbptr->reset(impl);
for (auto h : *handles) {
impl->NewThreadStatusCfInfo(
static_cast_with_check<ColumnFamilyHandleImpl>(h)->cfd());
}
} else {
for (auto h : *handles) {
delete h;
}
handles->clear();
delete impl;
}
return s;
}
Status DBImplSecondary::ScanCompactionProgressFiles(
CompactionProgressFilesScan* scan_result) {
assert(scan_result != nullptr);
scan_result->Clear();
WriteOptions write_options(Env::IOActivity::kCompaction);
IOOptions opts;
Status s = WritableFileWriter::PrepareIOOptions(write_options, opts);
if (!s.ok()) {
return s;
}
std::vector<std::string> all_filenames;
s = fs_->GetChildren(secondary_path_, opts, &all_filenames, nullptr );
if (!s.ok()) {
return s;
}
for (const auto& filename : all_filenames) {
if (filename == "." || filename == "..") {
continue;
}
uint64_t number;
FileType type;
if (!ParseFileName(filename, &number, &type)) {
continue;
}
if (type == kCompactionProgressFile) {
if (number > scan_result->latest_progress_timestamp) {
if (scan_result->HasLatestProgressFile()) {
scan_result->old_progress_filenames.push_back(
scan_result->latest_progress_filename.value());
}
scan_result->latest_progress_timestamp = number;
scan_result->latest_progress_filename = filename;
} else {
scan_result->old_progress_filenames.push_back(filename);
}
} else if (type == kTempFile &&
filename.find(kCompactionProgressFileNamePrefix) == 0) {
scan_result->temp_progress_filenames.push_back(filename);
} else if (type == kTableFile) {
scan_result->table_file_numbers.push_back(number);
}
}
return Status::OK();
}
Status DBImplSecondary::DeleteCompactionProgressFiles(
const std::vector<std::string>& filenames) {
WriteOptions write_options(Env::IOActivity::kCompaction);
IOOptions opts;
Status s = WritableFileWriter::PrepareIOOptions(write_options, opts);
if (!s.ok()) {
return s;
}
for (const auto& filename : filenames) {
std::string file_path = secondary_path_ + "/" + filename;
Status delete_status = fs_->DeleteFile(file_path, opts, nullptr );
if (!delete_status.ok()) {
return delete_status;
}
}
return Status::OK();
}
Status DBImplSecondary::CleanupOldAndTemporaryCompactionProgressFiles(
bool preserve_latest, const CompactionProgressFilesScan& scan_result) {
std::vector<std::string> filenames_to_delete;
filenames_to_delete.insert(filenames_to_delete.end(),
scan_result.old_progress_filenames.begin(),
scan_result.old_progress_filenames.end());
filenames_to_delete.insert(filenames_to_delete.end(),
scan_result.temp_progress_filenames.begin(),
scan_result.temp_progress_filenames.end());
if (!preserve_latest && scan_result.HasLatestProgressFile()) {
filenames_to_delete.push_back(scan_result.latest_progress_filename.value());
}
return DeleteCompactionProgressFiles(filenames_to_delete);
}
Status DBImplSecondary::LoadCompactionProgressAndCleanupExtraOutputFiles(
const std::string& compaction_progress_file_path,
const CompactionProgressFilesScan& scan_result) {
Status s = ParseCompactionProgressFile(compaction_progress_file_path,
&compaction_progress_);
if (s.ok()) {
s = CleanupPhysicalCompactionOutputFiles(true ,
scan_result);
}
return s;
}
Status DBImplSecondary::ParseCompactionProgressFile(
const std::string& compaction_progress_file_path,
CompactionProgress* compaction_progress) {
std::unique_ptr<FSSequentialFile> file;
Status s = fs_->NewSequentialFile(compaction_progress_file_path,
FileOptions(), &file, nullptr );
if (!s.ok()) {
return s;
}
std::unique_ptr<SequentialFileReader> file_reader(new SequentialFileReader(
std::move(file), compaction_progress_file_path,
immutable_db_options_.log_readahead_size, io_tracer_, {} ,
immutable_db_options_.rate_limiter.get()));
Status reader_status;
struct CompactionProgressReaderReporter : public log::Reader::Reporter {
Status* status;
explicit CompactionProgressReaderReporter(Status* s) : status(s) {}
void Corruption(size_t , const Status& s,
uint64_t ) override {
if (status->ok()) {
*status = s;
}
}
void OldLogRecord(size_t ) override {
}
} progress_reporter(&reader_status);
log::Reader compaction_progress_reader(
immutable_db_options_.info_log, std::move(file_reader),
&progress_reporter, true , 0 );
SubcompactionProgressBuilder progress_builder;
Slice slice;
std::string record;
while (compaction_progress_reader.ReadRecord(&slice, &record) &&
reader_status.ok()) {
VersionEdit edit;
s = edit.DecodeFrom(slice);
if (!s.ok()) {
break;
}
bool res = progress_builder.ProcessVersionEdit(edit);
if (!res) {
break;
}
}
if (!reader_status.ok()) {
return reader_status;
}
if (!s.ok()) {
return s;
}
if (progress_builder.HasAccumulatedSubcompactionProgress()) {
compaction_progress->clear();
compaction_progress->push_back(
progress_builder.GetAccumulatedSubcompactionProgress());
} else {
s = Status::NotFound("No compaction progress was persisted yet");
}
return s;
}
Status DBImplSecondary::RenameCompactionProgressFile(
const std::string& temp_file_path, std::string* final_file_path) {
uint64_t current_time = env_->NowMicros();
*final_file_path = CompactionProgressFileName(secondary_path_, current_time);
WriteOptions write_options(Env::IOActivity::kCompaction);
IOOptions opts;
Status s = WritableFileWriter::PrepareIOOptions(write_options, opts);
if (!s.ok()) {
return s;
}
s = fs_->RenameFile(temp_file_path, *final_file_path, opts,
nullptr );
return s;
}
Status DBImplSecondary::CleanupPhysicalCompactionOutputFiles(
bool preserve_tracked_files,
const CompactionProgressFilesScan& scan_result) {
std::unordered_set<uint64_t> files_to_preserve;
if (preserve_tracked_files) {
for (const auto& subcompaction_progress : compaction_progress_) {
for (const auto& file_metadata :
subcompaction_progress.output_level_progress.GetOutputFiles()) {
files_to_preserve.insert(file_metadata.fd.GetNumber());
}
for (const auto& file_metadata :
subcompaction_progress.proximal_output_level_progress
.GetOutputFiles()) {
files_to_preserve.insert(file_metadata.fd.GetNumber());
}
}
}
WriteOptions write_options(Env::IOActivity::kCompaction);
IOOptions opts;
Status s = WritableFileWriter::PrepareIOOptions(write_options, opts);
if (!s.ok()) {
return s;
}
for (uint64_t file_number : scan_result.table_file_numbers) {
bool should_delete =
!preserve_tracked_files ||
(files_to_preserve.find(file_number) == files_to_preserve.end());
if (should_delete) {
std::string file_path = MakeTableFileName(secondary_path_, file_number);
Status delete_status =
fs_->DeleteFile(file_path, opts, nullptr );
if (!delete_status.ok()) {
return delete_status;
}
}
}
return Status::OK();
}
Status DBImplSecondary::InitializeCompactionWorkspace(
bool allow_resumption, std::unique_ptr<FSDirectory>* output_dir,
std::unique_ptr<log::Writer>* compaction_progress_writer) {
Status s = CreateAndNewDirectory(fs_.get(), secondary_path_, output_dir);
if (!s.ok() || !allow_resumption) {
return s;
}
s = PrepareCompactionProgressState();
if (!s.ok()) {
return s;
}
s = FinalizeCompactionProgressWriter(compaction_progress_writer);
if (!s.ok()) {
return s;
}
return Status::OK();
}
Status DBImplSecondary::PrepareCompactionProgressState() {
Status s;
CompactionProgressFilesScan scan_result;
s = ScanCompactionProgressFiles(&scan_result);
if (!s.ok()) {
ROCKS_LOG_ERROR(immutable_db_options_.info_log,
"Encountered error when scanning for compaction "
"progress files: %s",
s.ToString().c_str());
return s;
}
std::optional<std::string> latest_progress_file =
scan_result.latest_progress_filename;
bool should_resume = false;
if (latest_progress_file.has_value()) {
should_resume = true;
} else {
ROCKS_LOG_WARN(immutable_db_options_.info_log,
"Did not find any latest compaction progress file. "
"Will perform clean up to start fresh compaction");
}
if (should_resume) {
s = CleanupOldAndTemporaryCompactionProgressFiles(
true , scan_result);
} else {
s = CleanupOldAndTemporaryCompactionProgressFiles(
false , scan_result);
latest_progress_file.reset();
}
if (!s.ok()) {
ROCKS_LOG_ERROR(immutable_db_options_.info_log,
"Failed to clean up compaction progress file(s): %s. "
"Will fail the compaction",
s.ToString().c_str());
return s;
}
if (latest_progress_file.has_value()) {
uint64_t timestamp = scan_result.latest_progress_timestamp;
std::string compaction_progress_file_path =
CompactionProgressFileName(secondary_path_, timestamp);
s = LoadCompactionProgressAndCleanupExtraOutputFiles(
compaction_progress_file_path, scan_result);
if (!s.ok()) {
ROCKS_LOG_WARN(immutable_db_options_.info_log,
"Failed to load the latest compaction "
"progress from %s: %s. Will perform clean up "
"to start fresh compaction",
latest_progress_file.value().c_str(),
s.ToString().c_str());
return HandleInvalidOrNoCompactionProgress(compaction_progress_file_path,
scan_result);
}
ROCKS_LOG_DEBUG(
immutable_db_options_.info_log,
"Loaded compaction progress with %zu subcompaction(s) from %s",
compaction_progress_.size(), compaction_progress_file_path.c_str());
return s;
} else {
return HandleInvalidOrNoCompactionProgress(
std::nullopt , scan_result);
}
}
uint64_t DBImplSecondary::CalculateResumedCompactionBytes(
const CompactionProgress& compaction_progress) const {
uint64_t total_resumed_bytes = 0;
for (const auto& subcompaction_progress : compaction_progress) {
for (const auto& file_meta :
subcompaction_progress.output_level_progress.GetOutputFiles()) {
total_resumed_bytes += file_meta.fd.file_size;
}
for (const auto& file_meta :
subcompaction_progress.proximal_output_level_progress
.GetOutputFiles()) {
total_resumed_bytes += file_meta.fd.file_size;
}
}
return total_resumed_bytes;
}
Status DBImplSecondary::HandleInvalidOrNoCompactionProgress(
const std::optional<std::string>& compaction_progress_file_path,
const CompactionProgressFilesScan& scan_result) {
compaction_progress_.clear();
Status s;
if (compaction_progress_file_path.has_value()) {
WriteOptions write_options(Env::IOActivity::kCompaction);
IOOptions opts;
s = WritableFileWriter::PrepareIOOptions(write_options, opts);
if (s.ok()) {
s = fs_->DeleteFile(compaction_progress_file_path.value(), opts,
nullptr );
}
if (!s.ok()) {
ROCKS_LOG_ERROR(immutable_db_options_.info_log,
"Failed to remove invalid progress file: %s",
s.ToString().c_str());
return s;
}
}
s = CleanupPhysicalCompactionOutputFiles(false ,
scan_result);
if (!s.ok()) {
ROCKS_LOG_ERROR(immutable_db_options_.info_log,
"Failed to cleanup existing compaction output files: %s",
s.ToString().c_str());
return s;
}
return Status::OK();
}
Status DBImplSecondary::CompactWithoutInstallation(
const OpenAndCompactOptions& options, ColumnFamilyHandle* cfh,
const CompactionServiceInput& input, CompactionServiceResult* result) {
if (options.canceled && options.canceled->load(std::memory_order_acquire)) {
return Status::Incomplete(Status::SubCode::kManualCompactionPaused);
}
std::unique_ptr<FSDirectory> output_dir;
std::unique_ptr<log::Writer> compaction_progress_writer;
InstrumentedMutexLock l(&mutex_);
auto cfd = static_cast_with_check<ColumnFamilyHandleImpl>(cfh)->cfd();
if (!cfd) {
return Status::InvalidArgument("Cannot find column family" +
cfh->GetName());
}
Status s;
const auto& mutable_cf_options = cfd->GetLatestMutableCFOptions();
bool output_hash_verification_enabled =
mutable_cf_options.paranoid_file_checks ||
!!(mutable_cf_options.verify_output_flags &
VerifyOutputFlags::kVerifyIteration);
bool allow_resumption =
options.allow_resumption && !output_hash_verification_enabled;
if (options.allow_resumption && output_hash_verification_enabled) {
ROCKS_LOG_WARN(immutable_db_options_.info_log,
"Resume compaction configured but disabled due to "
"incompatibility with output hash verification "
"(paranoid_file_checks=true or verify_output_flags "
"containing kVerifyIteration)");
}
mutex_.Unlock();
s = InitializeCompactionWorkspace(allow_resumption, &output_dir,
&compaction_progress_writer);
mutex_.Lock();
if (!s.ok()) {
return s;
}
std::unordered_set<uint64_t> input_set;
for (const auto& file_name : input.input_files) {
input_set.insert(TableFileNameToNumber(file_name));
}
auto* version = cfd->current();
ColumnFamilyMetaData cf_meta;
version->GetColumnFamilyMetaData(&cf_meta);
VersionStorageInfo* vstorage = version->storage_info();
CompactionOptions comp_options;
comp_options.compression = kDisableCompressionOption;
comp_options.output_file_size_limit = MaxFileSizeForLevel(
mutable_cf_options, input.output_level, cfd->ioptions().compaction_style,
vstorage->base_level(),
cfd->ioptions().level_compaction_dynamic_level_bytes);
std::vector<CompactionInputFiles> input_files;
s = cfd->compaction_picker()->GetCompactionInputsFromFileNumbers(
&input_files, &input_set, vstorage, comp_options);
if (!s.ok()) {
ROCKS_LOG_ERROR(
immutable_db_options_.info_log,
"GetCompactionInputsFromFileNumbers() failed - %s.\n DebugString: %s",
s.ToString().c_str(), version->DebugString(true).c_str());
return s;
}
const int job_id = next_job_id_.fetch_add(1);
JobContext job_context(job_id, true );
std::vector<SequenceNumber> snapshots = input.snapshots;
job_context.InitSnapshotContext(nullptr,
nullptr,
kMaxSequenceNumber, std::move(snapshots));
std::unique_ptr<Compaction> c;
assert(cfd->compaction_picker());
std::optional<SequenceNumber> earliest_snapshot = std::nullopt;
if (cfd->GetLatestCFOptions().compaction_style ==
CompactionStyle::kCompactionStyleUniversal) {
earliest_snapshot = !job_context.snapshot_seqs.empty()
? job_context.snapshot_seqs.front()
: kMaxSequenceNumber;
}
c.reset(cfd->compaction_picker()->PickCompactionForCompactFiles(
comp_options, input_files, input.output_level, vstorage,
mutable_cf_options, mutable_db_options_, 0, earliest_snapshot,
job_context.snapshot_checker));
assert(c != nullptr);
c->FinalizeInputInfo(version);
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL,
immutable_db_options_.info_log.get());
CompactionServiceCompactionJob compaction_job(
job_id, c.get(), immutable_db_options_, mutable_db_options_,
file_options_for_compaction_, versions_.get(), &shutting_down_,
&log_buffer, output_dir.get(), stats_, &mutex_, &error_handler_,
&job_context, table_cache_, &event_logger_, dbname_, io_tracer_,
options.canceled ? *options.canceled : kManualCompactionCanceledFalse_,
input.db_id, db_session_id_, secondary_path_, input, result);
compaction_job.Prepare(compaction_progress_,
compaction_progress_writer.get());
mutex_.Unlock();
s = compaction_job.Run();
mutex_.Lock();
compaction_job.io_status().PermitUncheckedError();
compaction_job.CleanupCompaction();
c->ReleaseCompactionFiles(s);
c.reset();
TEST_SYNC_POINT_CALLBACK("DBImplSecondary::CompactWithoutInstallation::End",
&s);
if (!compaction_progress_.empty() && s.ok()) {
uint64_t total_resumed_bytes =
CalculateResumedCompactionBytes(compaction_progress_);
if (total_resumed_bytes > 0 &&
immutable_db_options_.statistics != nullptr) {
RecordTick(immutable_db_options_.statistics.get(),
REMOTE_COMPACT_RESUMED_BYTES, total_resumed_bytes);
}
}
result->status = s;
return s;
}
Status DB::OpenAndCompact(
const OpenAndCompactOptions& options, const std::string& name,
const std::string& output_directory, const std::string& input,
std::string* output,
const CompactionServiceOptionsOverride& override_options) {
if (options.canceled && options.canceled->load(std::memory_order_acquire)) {
return Status::Incomplete(Status::SubCode::kManualCompactionPaused);
}
CompactionServiceInput compaction_input;
Status s = CompactionServiceInput::Read(input, &compaction_input);
if (!s.ok()) {
return s;
}
DBOptions base_db_options;
ConfigOptions config_options;
config_options.env = override_options.env;
config_options.ignore_unknown_options = true;
std::vector<ColumnFamilyDescriptor> all_column_families;
TEST_SYNC_POINT_CALLBACK(
"DBImplSecondary::OpenAndCompact::BeforeLoadingOptions:0",
&compaction_input.options_file_number);
TEST_SYNC_POINT("DBImplSecondary::OpenAndCompact::BeforeLoadingOptions:1");
std::string options_file_name =
OptionsFileName(name, compaction_input.options_file_number);
s = LoadOptionsFromFile(config_options, options_file_name, &base_db_options,
&all_column_families);
if (!s.ok()) {
return s;
}
DBOptions db_options;
s = GetDBOptionsFromMap(config_options, base_db_options,
override_options.options_map, &db_options);
if (!s.ok()) {
return s;
}
db_options.env = override_options.env;
db_options.file_checksum_gen_factory =
override_options.file_checksum_gen_factory;
db_options.statistics = override_options.statistics;
db_options.listeners = override_options.listeners;
db_options.compaction_service = nullptr;
db_options.max_open_files = -1;
db_options.info_log = override_options.info_log;
std::vector<ColumnFamilyDescriptor> column_families;
for (auto& cf : all_column_families) {
if (cf.name == compaction_input.cf_name) {
ColumnFamilyOptions cf_options;
s = GetColumnFamilyOptionsFromMap(config_options, cf.options,
override_options.options_map,
&cf_options);
if (!s.ok()) {
return s;
}
cf.options = std::move(cf_options);
cf.options.comparator = override_options.comparator;
cf.options.merge_operator = override_options.merge_operator;
cf.options.compaction_filter = override_options.compaction_filter;
cf.options.compaction_filter_factory =
override_options.compaction_filter_factory;
cf.options.prefix_extractor = override_options.prefix_extractor;
cf.options.table_factory = override_options.table_factory;
cf.options.sst_partitioner_factory =
override_options.sst_partitioner_factory;
cf.options.table_properties_collector_factories =
override_options.table_properties_collector_factories;
column_families.emplace_back(cf);
} else if (cf.name == kDefaultColumnFamilyName) {
column_families.emplace_back(cf);
}
}
std::unique_ptr<DB> db;
std::vector<ColumnFamilyHandle*> handles;
s = DB::OpenAsSecondary(db_options, name, output_directory, column_families,
&handles, &db);
if (!s.ok()) {
return s;
}
assert(db);
TEST_SYNC_POINT_CALLBACK(
"DBImplSecondary::OpenAndCompact::AfterOpenAsSecondary:0", db.get());
ColumnFamilyHandle* cfh = nullptr;
for (auto* handle : handles) {
if (compaction_input.cf_name == handle->GetName()) {
cfh = handle;
break;
}
}
assert(cfh);
CompactionServiceResult compaction_result;
DBImplSecondary* db_secondary =
static_cast_with_check<DBImplSecondary>(db.get());
s = db_secondary->CompactWithoutInstallation(options, cfh, compaction_input,
&compaction_result);
Status serialization_status = compaction_result.Write(output);
for (auto& handle : handles) {
delete handle;
}
db.reset();
if (s.ok()) {
return serialization_status;
} else {
serialization_status.PermitUncheckedError();
}
return s;
}
Status DB::OpenAndCompact(
const std::string& name, const std::string& output_directory,
const std::string& input, std::string* output,
const CompactionServiceOptionsOverride& override_options) {
return OpenAndCompact(OpenAndCompactOptions(), name, output_directory, input,
output, override_options);
}
Status DBImplSecondary::CreateCompactionProgressWriter(
const std::string& file_path,
std::unique_ptr<log::Writer>* compaction_progress_writer) {
std::unique_ptr<FSWritableFile> file;
Status s =
fs_->NewWritableFile(file_path, FileOptions(), &file, nullptr );
if (!s.ok()) {
return s;
}
std::unique_ptr<WritableFileWriter> file_writer(
new WritableFileWriter(std::move(file), file_path, FileOptions()));
compaction_progress_writer->reset(
new log::Writer(std::move(file_writer), 0 ,
false ));
return Status::OK();
}
Status DBImplSecondary::PersistInitialCompactionProgress(
log::Writer* compaction_progress_writer,
const CompactionProgress& compaction_progress) {
assert(compaction_progress_writer);
assert(compaction_progress.size() == 1);
const SubcompactionProgress& subcompaction_progress = compaction_progress[0];
VersionEdit edit;
edit.SetSubcompactionProgress(subcompaction_progress);
std::string record;
if (!edit.EncodeTo(&record)) {
return Status::IOError("Failed to encode the initial compaction progress");
}
WriteOptions write_options(Env::IOActivity::kCompaction);
Status s = compaction_progress_writer->AddRecord(write_options, record);
if (!s.ok()) {
return s;
}
IOOptions opts;
s = WritableFileWriter::PrepareIOOptions(write_options, opts);
if (!s.ok()) {
return s;
}
s = compaction_progress_writer->file()->Sync(opts,
immutable_db_options_.use_fsync);
return s;
}
Status DBImplSecondary::HandleCompactionProgressWriterCreationFailure(
const std::string& temp_file_path, const std::string& final_file_path,
std::unique_ptr<log::Writer>* compaction_progress_writer) {
compaction_progress_writer->reset();
const std::vector<std::string> paths_to_delete = {final_file_path,
temp_file_path};
Status s;
for (const auto& file_path : paths_to_delete) {
WriteOptions write_options(Env::IOActivity::kCompaction);
IOOptions opts;
s = WritableFileWriter::PrepareIOOptions(write_options, opts);
if (s.ok()) {
s = fs_->DeleteFile(file_path, opts, nullptr );
}
if (!s.ok()) {
ROCKS_LOG_ERROR(immutable_db_options_.info_log,
"Failed to cleanup the compaction progress file "
"during writer creation failure: %s",
s.ToString().c_str());
return s;
}
}
return s;
}
Status DBImplSecondary::FinalizeCompactionProgressWriter(
std::unique_ptr<log::Writer>* compaction_progress_writer) {
uint64_t timestamp = env_->NowMicros();
const std::string temp_file_path =
TempCompactionProgressFileName(secondary_path_, timestamp);
Status s = CreateCompactionProgressWriter(temp_file_path,
compaction_progress_writer);
if (!s.ok()) {
ROCKS_LOG_WARN(immutable_db_options_.info_log,
"Failed to create compaction progress writer at "
"temp path %s: %s. Will perform clean up "
"to start compaction without progress persistence",
temp_file_path.c_str(), s.ToString().c_str());
return HandleCompactionProgressWriterCreationFailure(
temp_file_path, "" , compaction_progress_writer);
}
if (!compaction_progress_.empty()) {
s = PersistInitialCompactionProgress(compaction_progress_writer->get(),
compaction_progress_);
if (!s.ok()) {
ROCKS_LOG_WARN(immutable_db_options_.info_log,
"Failed to persist the initial copmaction "
"progress: %s. Will perform clean up "
"to start compaction without progress persistence",
s.ToString().c_str());
return HandleCompactionProgressWriterCreationFailure(
temp_file_path, "" , compaction_progress_writer);
}
}
compaction_progress_writer->reset();
std::string final_file_path;
s = RenameCompactionProgressFile(temp_file_path, &final_file_path);
if (!s.ok()) {
ROCKS_LOG_WARN(immutable_db_options_.info_log,
"Failed to rename temporary compaction progress "
"file from %s to %s: %s. Will perform clean up "
"to start compaction without progress persistence",
temp_file_path.c_str(), final_file_path.c_str(),
s.ToString().c_str());
return HandleCompactionProgressWriterCreationFailure(
temp_file_path, final_file_path, compaction_progress_writer);
}
s = CreateCompactionProgressWriter(final_file_path,
compaction_progress_writer);
if (!s.ok()) {
ROCKS_LOG_WARN(immutable_db_options_.info_log,
"Failed to create the final compaction progress "
"writer: %s. Will attempt clean to start the compaction "
"without progress persistence",
s.ToString().c_str());
return HandleCompactionProgressWriterCreationFailure(
"" , final_file_path, compaction_progress_writer);
}
ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
"Finalized compaction progress writer onto %s",
final_file_path.c_str());
return Status::OK();
}
}