#include <cinttypes>
#include "db/builder.h"
#include "db/db_impl/db_impl.h"
#include "db/error_handler.h"
#include "db/periodic_task_scheduler.h"
#include "env/composite_env_wrapper.h"
#include "file/filename.h"
#include "file/read_write_util.h"
#include "file/sst_file_manager_impl.h"
#include "file/writable_file_writer.h"
#include "logging/logging.h"
#include "monitoring/persistent_stats_history.h"
#include "monitoring/thread_status_util.h"
#include "options/options_helper.h"
#include "rocksdb/options.h"
#include "rocksdb/table.h"
#include "rocksdb/wal_filter.h"
#include "test_util/sync_point.h"
#include "util/rate_limiter_impl.h"
#include "util/string_util.h"
#include "util/udt_util.h"
namespace ROCKSDB_NAMESPACE {
Options SanitizeOptions(const std::string& dbname, const Options& src,
bool read_only, Status* logger_creation_s) {
auto db_options =
SanitizeOptions(dbname, DBOptions(src), read_only, logger_creation_s);
ImmutableDBOptions immutable_db_options(db_options);
auto cf_options = SanitizeCfOptions(immutable_db_options, read_only,
ColumnFamilyOptions(src));
return Options(db_options, cf_options);
}
DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src,
bool read_only, Status* logger_creation_s) {
DBOptions result(src);
if (result.env == nullptr) {
result.env = Env::Default();
}
if (result.max_open_files != -1) {
int max_max_open_files = port::GetMaxOpenFiles();
if (max_max_open_files == -1) {
max_max_open_files = 0x400000;
}
ClipToRange(&result.max_open_files, 20, max_max_open_files);
TEST_SYNC_POINT_CALLBACK("SanitizeOptions::AfterChangeMaxOpenFiles",
&result.max_open_files);
}
if (result.info_log == nullptr && !read_only) {
Status s = CreateLoggerFromOptions(dbname, result, &result.info_log);
if (!s.ok()) {
result.info_log = nullptr;
if (logger_creation_s) {
*logger_creation_s = s;
}
}
}
if (!result.write_buffer_manager) {
result.write_buffer_manager.reset(
new WriteBufferManager(result.db_write_buffer_size));
}
auto bg_job_limits = DBImpl::GetBGJobLimits(
result.max_background_flushes, result.max_background_compactions,
result.max_background_jobs, true );
result.env->IncBackgroundThreadsIfNeeded(bg_job_limits.max_compactions,
Env::Priority::LOW);
result.env->IncBackgroundThreadsIfNeeded(bg_job_limits.max_flushes,
Env::Priority::HIGH);
if (result.rate_limiter.get() != nullptr) {
if (result.bytes_per_sync == 0) {
result.bytes_per_sync = 1024 * 1024;
}
}
if (result.delayed_write_rate == 0) {
if (result.rate_limiter.get() != nullptr) {
result.delayed_write_rate = result.rate_limiter->GetBytesPerSecond();
}
if (result.delayed_write_rate == 0) {
result.delayed_write_rate = 16 * 1024 * 1024;
}
}
if (result.WAL_ttl_seconds > 0 || result.WAL_size_limit_MB > 0) {
result.recycle_log_file_num = false;
}
if (result.recycle_log_file_num &&
(result.wal_recovery_mode ==
WALRecoveryMode::kTolerateCorruptedTailRecords ||
result.wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency)) {
result.recycle_log_file_num = 0;
}
if (result.db_paths.size() == 0) {
result.db_paths.emplace_back(dbname, std::numeric_limits<uint64_t>::max());
} else if (result.wal_dir.empty()) {
result.wal_dir = dbname;
}
if (!result.wal_dir.empty()) {
auto npath = NormalizePath(dbname + "/");
if (npath == NormalizePath(result.wal_dir + "/") &&
npath == NormalizePath(result.db_paths[0].path + "/")) {
result.wal_dir.clear();
}
}
if (!result.wal_dir.empty() && result.wal_dir.back() == '/') {
result.wal_dir = result.wal_dir.substr(0, result.wal_dir.size() - 1);
}
if (result.allow_2pc) {
result.avoid_flush_during_recovery = false;
}
ImmutableDBOptions immutable_db_options(result);
if (!immutable_db_options.IsWalDirSameAsDBPath()) {
std::vector<std::string> filenames;
IOOptions io_opts;
io_opts.do_not_recurse = true;
auto wal_dir = immutable_db_options.GetWalDir();
Status s = immutable_db_options.fs->GetChildren(
wal_dir, io_opts, &filenames, nullptr);
s.PermitUncheckedError(); for (std::string& filename : filenames) {
if (filename.find(".log.trash", filename.length() -
std::string(".log.trash").length()) !=
std::string::npos) {
std::string trash_file = wal_dir + "/" + filename;
result.env->DeleteFile(trash_file).PermitUncheckedError();
}
}
}
if (result.sst_file_manager.get() == nullptr) {
std::shared_ptr<SstFileManager> sst_file_manager(
NewSstFileManager(result.env, result.info_log));
result.sst_file_manager = sst_file_manager;
}
if (!StreamingCompressionTypeSupported(result.wal_compression)) {
result.wal_compression = kNoCompression;
ROCKS_LOG_WARN(result.info_log,
"wal_compression is disabled since only zstd is supported");
}
return result;
}
namespace {
Status ValidateOptionsByTable(
const DBOptions& db_opts,
const std::vector<ColumnFamilyDescriptor>& column_families) {
Status s;
for (auto& cf : column_families) {
s = ValidateOptions(db_opts, cf.options);
if (!s.ok()) {
return s;
}
}
return Status::OK();
}
}
Status DBImpl::ValidateOptions(
const DBOptions& db_options,
const std::vector<ColumnFamilyDescriptor>& column_families) {
Status s;
for (auto& cfd : column_families) {
s = ColumnFamilyData::ValidateOptions(db_options, cfd.options);
if (!s.ok()) {
return s;
}
if (cfd.name == kDefaultColumnFamilyName) {
if (cfd.options.disallow_memtable_writes) {
return Status::InvalidArgument(
"Default column family cannot use disallow_memtable_writes=true");
}
}
}
s = ValidateOptions(db_options);
return s;
}
Status DBImpl::ValidateOptions(const DBOptions& db_options) {
if (db_options.db_paths.size() > 4) {
return Status::NotSupported(
"More than four DB paths are not supported yet. ");
}
if (db_options.allow_mmap_reads && db_options.use_direct_reads) {
return Status::NotSupported(
"If memory mapped reads (allow_mmap_reads) are enabled "
"then direct I/O reads (use_direct_reads) must be disabled. ");
}
if (db_options.allow_mmap_writes &&
db_options.use_direct_io_for_flush_and_compaction) {
return Status::NotSupported(
"If memory mapped writes (allow_mmap_writes) are enabled "
"then direct I/O writes (use_direct_io_for_flush_and_compaction) must "
"be disabled. ");
}
if (db_options.keep_log_file_num == 0) {
return Status::InvalidArgument("keep_log_file_num must be greater than 0");
}
if (db_options.unordered_write &&
!db_options.allow_concurrent_memtable_write) {
return Status::InvalidArgument(
"unordered_write is incompatible with "
"!allow_concurrent_memtable_write");
}
if (db_options.unordered_write && db_options.enable_pipelined_write) {
return Status::InvalidArgument(
"unordered_write is incompatible with enable_pipelined_write");
}
if (db_options.atomic_flush && db_options.enable_pipelined_write) {
return Status::InvalidArgument(
"atomic_flush is incompatible with enable_pipelined_write");
}
if (db_options.use_direct_io_for_flush_and_compaction &&
0 == db_options.writable_file_max_buffer_size) {
return Status::InvalidArgument(
"writes in direct IO require writable_file_max_buffer_size > 0");
}
if (db_options.daily_offpeak_time_utc != "") {
int start_time, end_time;
if (!TryParseTimeRangeString(db_options.daily_offpeak_time_utc, start_time,
end_time)) {
return Status::InvalidArgument(
"daily_offpeak_time_utc should be set in the format HH:mm-HH:mm "
"(e.g. 04:30-07:30)");
} else if (start_time == end_time) {
return Status::InvalidArgument(
"start_time and end_time cannot be the same");
}
}
if (!db_options.write_dbid_to_manifest && !db_options.write_identity_file) {
return Status::InvalidArgument(
"write_dbid_to_manifest and write_identity_file cannot both be false");
}
return Status::OK();
}
Status DBImpl::NewDB(std::vector<std::string>* new_filenames) {
VersionEdit new_db_edit;
const WriteOptions write_options(Env::IOActivity::kDBOpen);
Status s = SetupDBId(write_options, false, true,
false, &new_db_edit);
if (!s.ok()) {
return s;
}
new_db_edit.SetLogNumber(0);
new_db_edit.SetNextFile(2);
new_db_edit.SetLastSequence(0);
ROCKS_LOG_INFO(immutable_db_options_.info_log, "Creating manifest 1 \n");
const std::string manifest = DescriptorFileName(dbname_, 1);
{
if (fs_->FileExists(manifest, IOOptions(), nullptr).ok()) {
fs_->DeleteFile(manifest, IOOptions(), nullptr).PermitUncheckedError();
}
std::unique_ptr<FSWritableFile> file;
FileOptions file_options = fs_->OptimizeForManifestWrite(file_options_);
if (immutable_db_options_.metadata_write_temperature !=
Temperature::kUnknown) {
file_options.temperature =
immutable_db_options_.metadata_write_temperature;
}
s = NewWritableFile(fs_.get(), manifest, &file, file_options);
if (!s.ok()) {
return s;
}
FileTypeSet tmp_set = immutable_db_options_.checksum_handoff_file_types;
file->SetPreallocationBlockSize(
mutable_db_options_.manifest_preallocation_size);
std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
std::move(file), manifest, file_options, immutable_db_options_.clock,
io_tracer_, nullptr ,
Histograms::HISTOGRAM_ENUM_MAX ,
immutable_db_options_.listeners, nullptr,
tmp_set.Contains(FileType::kDescriptorFile),
tmp_set.Contains(FileType::kDescriptorFile)));
log::Writer log(std::move(file_writer), 0, false);
std::string record;
new_db_edit.EncodeTo(&record);
s = log.AddRecord(write_options, record);
if (s.ok()) {
s = SyncManifest(&immutable_db_options_, write_options, log.file());
}
}
if (s.ok()) {
s = SetCurrentFile(write_options, fs_.get(), dbname_, 1,
immutable_db_options_.metadata_write_temperature,
directories_.GetDbDir());
if (new_filenames) {
new_filenames->emplace_back(
manifest.substr(manifest.find_last_of("/\\") + 1));
}
} else {
fs_->DeleteFile(manifest, IOOptions(), nullptr).PermitUncheckedError();
}
return s;
}
IOStatus DBImpl::CreateAndNewDirectory(
FileSystem* fs, const std::string& dirname,
std::unique_ptr<FSDirectory>* directory) {
IOStatus io_s = fs->CreateDirIfMissing(dirname, IOOptions(), nullptr);
if (!io_s.ok()) {
return io_s;
}
return fs->NewDirectory(dirname, IOOptions(), directory, nullptr);
}
IOStatus Directories::SetDirectories(FileSystem* fs, const std::string& dbname,
const std::string& wal_dir,
const std::vector<DbPath>& data_paths) {
IOStatus io_s = DBImpl::CreateAndNewDirectory(fs, dbname, &db_dir_);
if (!io_s.ok()) {
return io_s;
}
if (!wal_dir.empty() && dbname != wal_dir) {
io_s = DBImpl::CreateAndNewDirectory(fs, wal_dir, &wal_dir_);
if (!io_s.ok()) {
return io_s;
}
}
data_dirs_.clear();
for (auto& p : data_paths) {
const std::string db_path = p.path;
if (db_path == dbname) {
data_dirs_.emplace_back(nullptr);
} else {
std::unique_ptr<FSDirectory> path_directory;
io_s = DBImpl::CreateAndNewDirectory(fs, db_path, &path_directory);
if (!io_s.ok()) {
return io_s;
}
data_dirs_.emplace_back(path_directory.release());
}
}
assert(data_dirs_.size() == data_paths.size());
return IOStatus::OK();
}
Status DBImpl::Recover(
const std::vector<ColumnFamilyDescriptor>& column_families, bool read_only,
bool error_if_wal_file_exists, bool error_if_data_exists_in_wals,
bool is_retry, uint64_t* recovered_seq, RecoveryContext* recovery_ctx,
bool* can_retry) {
mutex_.AssertHeld();
const WriteOptions write_options(Env::IOActivity::kDBOpen);
bool tmp_is_new_db = false;
bool& is_new_db = recovery_ctx ? recovery_ctx->is_new_db_ : tmp_is_new_db;
assert(db_lock_ == nullptr);
std::vector<std::string> files_in_dbname;
if (!read_only) {
Status s = directories_.SetDirectories(fs_.get(), dbname_,
immutable_db_options_.wal_dir,
immutable_db_options_.db_paths);
if (!s.ok()) {
return s;
}
s = env_->LockFile(LockFileName(dbname_), &db_lock_);
if (!s.ok()) {
return s;
}
std::string current_fname = CurrentFileName(dbname_);
std::string manifest_path;
if (!immutable_db_options_.best_efforts_recovery) {
s = env_->FileExists(current_fname);
} else {
s = Status::NotFound();
IOOptions io_opts;
io_opts.do_not_recurse = true;
Status io_s = immutable_db_options_.fs->GetChildren(
dbname_, io_opts, &files_in_dbname, nullptr);
if (!io_s.ok()) {
s = io_s;
files_in_dbname.clear();
}
for (const std::string& file : files_in_dbname) {
uint64_t number = 0;
FileType type = kWalFile; if (ParseFileName(file, &number, &type) && type == kDescriptorFile) {
uint64_t bytes;
s = env_->GetFileSize(DescriptorFileName(dbname_, number), &bytes);
if (s.ok() && bytes != 0) {
manifest_path = dbname_ + "/" + file;
break;
}
}
}
}
if (s.IsNotFound()) {
if (immutable_db_options_.create_if_missing) {
s = NewDB(&files_in_dbname);
is_new_db = true;
if (!s.ok()) {
return s;
}
} else {
return Status::InvalidArgument(
current_fname, "does not exist (create_if_missing is false)");
}
} else if (s.ok()) {
if (immutable_db_options_.error_if_exists) {
return Status::InvalidArgument(dbname_,
"exists (error_if_exists is true)");
}
} else {
assert(s.IsIOError());
return s;
}
{
std::unique_ptr<FSRandomAccessFile> idfile;
FileOptions customized_fs(file_options_);
customized_fs.use_direct_reads |=
immutable_db_options_.use_direct_io_for_flush_and_compaction;
const std::string& fname =
manifest_path.empty() ? current_fname : manifest_path;
s = fs_->NewRandomAccessFile(fname, customized_fs, &idfile, nullptr);
if (!s.ok()) {
std::string error_str = s.ToString();
customized_fs.use_direct_reads = false;
s = fs_->NewRandomAccessFile(fname, customized_fs, &idfile, nullptr);
if (s.ok()) {
return Status::InvalidArgument(
"Direct I/O is not supported by the specified DB.");
} else {
return Status::InvalidArgument(
"Found options incompatible with filesystem", error_str.c_str());
}
}
}
} else if (immutable_db_options_.best_efforts_recovery) {
assert(files_in_dbname.empty());
IOOptions io_opts;
io_opts.do_not_recurse = true;
Status s = immutable_db_options_.fs->GetChildren(
dbname_, io_opts, &files_in_dbname, nullptr);
if (s.IsNotFound()) {
return Status::InvalidArgument(dbname_,
"does not exist (open for read only)");
} else if (s.IsIOError()) {
return s;
}
assert(s.ok());
}
assert(is_new_db || db_id_.empty());
Status s;
bool missing_table_file = false;
if (!immutable_db_options_.best_efforts_recovery) {
Status desc_status;
s = versions_->Recover(column_families, read_only, &db_id_,
false, is_retry,
&desc_status);
desc_status.PermitUncheckedError();
if (is_retry) {
RecordTick(stats_, FILE_READ_CORRUPTION_RETRY_COUNT);
if (desc_status.ok()) {
RecordTick(stats_, FILE_READ_CORRUPTION_RETRY_SUCCESS_COUNT);
}
}
if (can_retry) {
if (!is_retry &&
(desc_status.IsCorruption() || s.IsNotFound() || s.IsCorruption()) &&
CheckFSFeatureSupport(fs_.get(),
FSSupportedOps::kVerifyAndReconstructRead)) {
*can_retry = true;
ROCKS_LOG_ERROR(
immutable_db_options_.info_log,
"Possible corruption detected while replaying MANIFEST %s, %s. "
"Will be retried.",
desc_status.ToString().c_str(), s.ToString().c_str());
} else {
*can_retry = false;
}
}
} else {
assert(!files_in_dbname.empty());
s = versions_->TryRecover(column_families, read_only, files_in_dbname,
&db_id_, &missing_table_file);
if (s.ok()) {
column_family_memtables_.reset(
new ColumnFamilyMemTablesImpl(versions_->GetColumnFamilySet()));
}
}
if (!s.ok()) {
return s;
}
if (s.ok() && !read_only) {
for (auto cfd : *versions_->GetColumnFamilySet()) {
const auto& moptions = cfd->GetLatestMutableCFOptions();
if (cfd->ioptions().compaction_style ==
CompactionStyle::kCompactionStyleLevel &&
cfd->ioptions().level_compaction_dynamic_level_bytes &&
!moptions.disable_auto_compactions) {
int to_level = cfd->ioptions().num_levels - 1;
if (cfd->AllowIngestBehind() ||
moptions.preclude_last_level_data_seconds > 0) {
to_level -= 1;
}
bool moved = false;
for (int from_level = to_level; from_level >= 0; --from_level) {
const std::vector<FileMetaData*>& level_files =
cfd->current()->storage_info()->LevelFiles(from_level);
if (level_files.empty() || from_level == 0) {
continue;
}
assert(from_level <= to_level);
if (from_level < to_level) {
if (!moved) {
std::string lsm_state = "[";
for (int i = 0; i < cfd->ioptions().num_levels; ++i) {
lsm_state += std::to_string(
cfd->current()->storage_info()->NumLevelFiles(i));
if (i < cfd->ioptions().num_levels - 1) {
lsm_state += ",";
}
}
lsm_state += "]";
ROCKS_LOG_WARN(immutable_db_options_.info_log,
"[%s] Trivially move files down the LSM when open "
"with level_compaction_dynamic_level_bytes=true,"
" lsm_state: %s (Files are moved only if DB "
"Recovery is successful).",
cfd->GetName().c_str(), lsm_state.c_str());
moved = true;
}
ROCKS_LOG_WARN(
immutable_db_options_.info_log,
"[%s] Moving %zu files from from_level-%d to from_level-%d",
cfd->GetName().c_str(), level_files.size(), from_level,
to_level);
VersionEdit edit;
edit.SetColumnFamily(cfd->GetID());
for (const FileMetaData* f : level_files) {
edit.DeleteFile(from_level, f->fd.GetNumber());
edit.AddFile(to_level, f->fd.GetNumber(), f->fd.GetPathId(),
f->fd.GetFileSize(), f->smallest, f->largest,
f->fd.smallest_seqno, f->fd.largest_seqno,
f->marked_for_compaction,
f->temperature, f->oldest_blob_file_number, f->oldest_ancester_time,
f->file_creation_time, f->epoch_number,
f->file_checksum, f->file_checksum_func_name,
f->unique_id, f->compensated_range_deletion_size,
f->tail_size, f->user_defined_timestamps_persisted,
f->min_timestamp, f->max_timestamp);
ROCKS_LOG_WARN(immutable_db_options_.info_log,
"[%s] Moving #%" PRIu64
" from from_level-%d to from_level-%d %" PRIu64
" bytes\n",
cfd->GetName().c_str(), f->fd.GetNumber(),
from_level, to_level, f->fd.GetFileSize());
}
recovery_ctx->UpdateVersionEdits(cfd, edit);
}
--to_level;
}
}
}
}
if (is_new_db) {
} else if (immutable_db_options_.write_dbid_to_manifest && recovery_ctx) {
VersionEdit edit;
s = SetupDBId(write_options, read_only, is_new_db, is_retry, &edit);
recovery_ctx->UpdateVersionEdits(
versions_->GetColumnFamilySet()->GetDefault(), edit);
} else {
s = SetupDBId(write_options, read_only, is_new_db, is_retry, nullptr);
}
assert(!s.ok() || !db_id_.empty());
ROCKS_LOG_INFO(immutable_db_options_.info_log, "DB ID: %s\n", db_id_.c_str());
if (s.ok() && !read_only) {
s = MaybeUpdateNextFileNumber(recovery_ctx);
}
if (s.ok() && !read_only) {
std::map<std::string, std::shared_ptr<FSDirectory>> created_dirs;
for (auto cfd : *versions_->GetColumnFamilySet()) {
s = cfd->AddDirectories(&created_dirs);
if (!s.ok()) {
return s;
}
}
}
std::vector<std::string> files_in_wal_dir;
if (s.ok()) {
max_total_in_memory_state_ = 0;
for (auto cfd : *versions_->GetColumnFamilySet()) {
const auto& mutable_cf_options = cfd->GetLatestMutableCFOptions();
max_total_in_memory_state_ += mutable_cf_options.write_buffer_size *
mutable_cf_options.max_write_buffer_number;
}
SequenceNumber next_sequence(kMaxSequenceNumber);
default_cf_handle_ = new ColumnFamilyHandleImpl(
versions_->GetColumnFamilySet()->GetDefault(), this, &mutex_);
default_cf_internal_stats_ = default_cf_handle_->cfd()->internal_stats();
auto wal_dir = immutable_db_options_.GetWalDir();
if (!immutable_db_options_.best_efforts_recovery) {
IOOptions io_opts;
io_opts.do_not_recurse = true;
s = immutable_db_options_.fs->GetChildren(
wal_dir, io_opts, &files_in_wal_dir, nullptr);
}
if (s.IsNotFound()) {
return Status::InvalidArgument("wal_dir not found", wal_dir);
} else if (!s.ok()) {
return s;
}
std::unordered_map<uint64_t, std::string> wal_files;
for (const auto& file : files_in_wal_dir) {
uint64_t number;
FileType type;
if (ParseFileName(file, &number, &type) && type == kWalFile) {
if (is_new_db) {
return Status::Corruption(
"While creating a new Db, wal_dir contains "
"existing log file: ",
file);
} else {
wal_files[number] = LogFileName(wal_dir, number);
}
}
}
if (immutable_db_options_.track_and_verify_wals && !is_new_db &&
!immutable_db_options_.best_efforts_recovery && wal_files.empty()) {
return Status::Corruption("Opening an existing DB with no WAL files");
}
if (immutable_db_options_.track_and_verify_wals_in_manifest) {
if (!immutable_db_options_.best_efforts_recovery) {
s = versions_->GetWalSet().CheckWals(env_, wal_files);
} } else if (!versions_->GetWalSet().GetWals().empty()) {
VersionEdit edit;
WalNumber max_wal_number =
versions_->GetWalSet().GetWals().rbegin()->first;
edit.DeleteWalsBefore(max_wal_number + 1);
assert(recovery_ctx != nullptr);
assert(versions_->GetColumnFamilySet() != nullptr);
recovery_ctx->UpdateVersionEdits(
versions_->GetColumnFamilySet()->GetDefault(), edit);
}
if (!s.ok()) {
return s;
}
if (!wal_files.empty()) {
if (error_if_wal_file_exists) {
return Status::Corruption(
"The db was opened in readonly mode with error_if_wal_file_exists"
"flag but a WAL file already exists");
} else if (error_if_data_exists_in_wals) {
for (auto& wal_file : wal_files) {
uint64_t bytes;
s = env_->GetFileSize(wal_file.second, &bytes);
if (s.ok()) {
if (bytes > 0) {
return Status::Corruption(
"error_if_data_exists_in_wals is set but there are data "
" in WAL files.");
}
}
}
}
}
if (!wal_files.empty()) {
std::vector<uint64_t> wals;
wals.reserve(wal_files.size());
for (const auto& wal_file : wal_files) {
wals.push_back(wal_file.first);
}
std::sort(wals.begin(), wals.end());
bool corrupted_wal_found = false;
s = RecoverLogFiles(wals, &next_sequence, read_only, is_retry,
&corrupted_wal_found, recovery_ctx);
if (corrupted_wal_found && recovered_seq != nullptr) {
*recovered_seq = next_sequence;
}
if (!s.ok()) {
for (auto cfd : *versions_->GetColumnFamilySet()) {
cfd->CreateNewMemtable(kMaxSequenceNumber);
}
}
}
}
if (read_only) {
std::vector<std::string> filenames;
if (s.ok()) {
const std::string normalized_dbname = NormalizePath(dbname_);
const std::string normalized_wal_dir =
NormalizePath(immutable_db_options_.GetWalDir());
if (immutable_db_options_.best_efforts_recovery) {
filenames = std::move(files_in_dbname);
} else if (normalized_dbname == normalized_wal_dir) {
filenames = std::move(files_in_wal_dir);
} else {
IOOptions io_opts;
io_opts.do_not_recurse = true;
s = immutable_db_options_.fs->GetChildren(
GetName(), io_opts, &filenames, nullptr);
}
}
if (s.ok()) {
uint64_t number = 0;
uint64_t options_file_number = 0;
FileType type;
for (const auto& fname : filenames) {
if (ParseFileName(fname, &number, &type) && type == kOptionsFile) {
options_file_number = std::max(number, options_file_number);
}
}
versions_->options_file_number_ = options_file_number;
uint64_t options_file_size = 0;
if (options_file_number > 0) {
s = env_->GetFileSize(OptionsFileName(GetName(), options_file_number),
&options_file_size);
}
versions_->options_file_size_ = options_file_size;
}
}
return s;
}
Status DBImpl::PersistentStatsProcessFormatVersion() {
mutex_.AssertHeld();
Status s;
bool should_persist_format_version = !persistent_stats_cfd_exists_;
mutex_.Unlock();
if (persistent_stats_cfd_exists_) {
uint64_t format_version_recovered = 0;
Status s_format = DecodePersistentStatsVersionNumber(
this, StatsVersionKeyType::kFormatVersion, &format_version_recovered);
uint64_t compatible_version_recovered = 0;
Status s_compatible = DecodePersistentStatsVersionNumber(
this, StatsVersionKeyType::kCompatibleVersion,
&compatible_version_recovered);
if (!s_format.ok() || !s_compatible.ok() ||
(kStatsCFCurrentFormatVersion < format_version_recovered &&
kStatsCFCompatibleFormatVersion < compatible_version_recovered)) {
if (!s_format.ok() || !s_compatible.ok()) {
ROCKS_LOG_WARN(
immutable_db_options_.info_log,
"Recreating persistent stats column family since reading "
"persistent stats version key failed. Format key: %s, compatible "
"key: %s",
s_format.ToString().c_str(), s_compatible.ToString().c_str());
} else {
ROCKS_LOG_WARN(
immutable_db_options_.info_log,
"Recreating persistent stats column family due to corrupted or "
"incompatible format version. Recovered format: %" PRIu64
"; recovered format compatible since: %" PRIu64 "\n",
format_version_recovered, compatible_version_recovered);
}
s = DropColumnFamily(persist_stats_cf_handle_);
if (s.ok()) {
s = DestroyColumnFamilyHandle(persist_stats_cf_handle_);
}
ColumnFamilyHandle* handle = nullptr;
if (s.ok()) {
ColumnFamilyOptions cfo;
OptimizeForPersistentStats(&cfo);
s = CreateColumnFamilyImpl(ReadOptions(Env::IOActivity::kDBOpen),
WriteOptions(Env::IOActivity::kDBOpen), cfo,
kPersistentStatsColumnFamilyName, &handle);
}
if (s.ok()) {
persist_stats_cf_handle_ = static_cast<ColumnFamilyHandleImpl*>(handle);
should_persist_format_version = true;
}
}
}
if (should_persist_format_version) {
WriteBatch batch;
if (s.ok()) {
s = batch.Put(persist_stats_cf_handle_, kFormatVersionKeyString,
std::to_string(kStatsCFCurrentFormatVersion));
}
if (s.ok()) {
s = batch.Put(persist_stats_cf_handle_, kCompatibleVersionKeyString,
std::to_string(kStatsCFCompatibleFormatVersion));
}
if (s.ok()) {
WriteOptions wo;
wo.low_pri = true;
wo.no_slowdown = true;
wo.sync = false;
s = Write(wo, &batch);
}
}
mutex_.Lock();
return s;
}
Status DBImpl::InitPersistStatsColumnFamily() {
mutex_.AssertHeld();
assert(!persist_stats_cf_handle_);
ColumnFamilyData* persistent_stats_cfd =
versions_->GetColumnFamilySet()->GetColumnFamily(
kPersistentStatsColumnFamilyName);
persistent_stats_cfd_exists_ = persistent_stats_cfd != nullptr;
Status s;
if (persistent_stats_cfd != nullptr) {
persist_stats_cf_handle_ =
new ColumnFamilyHandleImpl(persistent_stats_cfd, this, &mutex_);
} else {
mutex_.Unlock();
ColumnFamilyHandle* handle = nullptr;
ColumnFamilyOptions cfo;
OptimizeForPersistentStats(&cfo);
s = CreateColumnFamilyImpl(ReadOptions(Env::IOActivity::kDBOpen),
WriteOptions(Env::IOActivity::kDBOpen), cfo,
kPersistentStatsColumnFamilyName, &handle);
persist_stats_cf_handle_ = static_cast<ColumnFamilyHandleImpl*>(handle);
mutex_.Lock();
}
return s;
}
Status DBImpl::LogAndApplyForRecovery(const RecoveryContext& recovery_ctx) {
mutex_.AssertHeld();
assert(versions_->descriptor_log_ == nullptr);
const ReadOptions read_options(Env::IOActivity::kDBOpen);
const WriteOptions write_options(Env::IOActivity::kDBOpen);
Status s = versions_->LogAndApply(recovery_ctx.cfds_, read_options,
write_options, recovery_ctx.edit_lists_,
&mutex_, directories_.GetDbDir());
return s;
}
void DBImpl::InvokeWalFilterIfNeededOnColumnFamilyToWalNumberMap() {
if (immutable_db_options_.wal_filter == nullptr) {
return;
}
assert(immutable_db_options_.wal_filter != nullptr);
WalFilter& wal_filter = *(immutable_db_options_.wal_filter);
std::map<std::string, uint32_t> cf_name_id_map;
std::map<uint32_t, uint64_t> cf_lognumber_map;
assert(versions_);
assert(versions_->GetColumnFamilySet());
for (auto cfd : *versions_->GetColumnFamilySet()) {
assert(cfd);
cf_name_id_map.insert(std::make_pair(cfd->GetName(), cfd->GetID()));
cf_lognumber_map.insert(std::make_pair(cfd->GetID(), cfd->GetLogNumber()));
}
wal_filter.ColumnFamilyLogNumberMap(cf_lognumber_map, cf_name_id_map);
}
bool DBImpl::InvokeWalFilterIfNeededOnWalRecord(uint64_t wal_number,
const std::string& wal_fname,
log::Reader::Reporter& reporter,
Status& status,
bool& stop_replay,
WriteBatch& batch) {
if (immutable_db_options_.wal_filter == nullptr) {
return true;
}
assert(immutable_db_options_.wal_filter != nullptr);
WalFilter& wal_filter = *(immutable_db_options_.wal_filter);
WriteBatch new_batch;
bool batch_changed = false;
bool process_current_record = true;
WalFilter::WalProcessingOption wal_processing_option =
wal_filter.LogRecordFound(wal_number, wal_fname, batch, &new_batch,
&batch_changed);
switch (wal_processing_option) {
case WalFilter::WalProcessingOption::kContinueProcessing:
break;
case WalFilter::WalProcessingOption::kIgnoreCurrentRecord:
process_current_record = false;
break;
case WalFilter::WalProcessingOption::kStopReplay:
process_current_record = false;
stop_replay = true;
break;
case WalFilter::WalProcessingOption::kCorruptedRecord: {
status = Status::Corruption("Corruption reported by Wal Filter ",
wal_filter.Name());
MaybeIgnoreError(&status);
if (!status.ok()) {
process_current_record = false;
reporter.Corruption(batch.GetDataSize(), status);
}
break;
}
default: {
assert(false);
status = Status::NotSupported(
"Unknown WalProcessingOption returned by Wal Filter ",
wal_filter.Name());
MaybeIgnoreError(&status);
if (!status.ok()) {
stop_replay = true;
}
break;
}
}
if (!process_current_record) {
return false;
}
if (batch_changed) {
int new_count = WriteBatchInternal::Count(&new_batch);
int original_count = WriteBatchInternal::Count(&batch);
if (new_count > original_count) {
ROCKS_LOG_FATAL(
immutable_db_options_.info_log,
"Recovering log #%" PRIu64
" mode %d log filter %s returned "
"more records (%d) than original (%d) which is not allowed. "
"Aborting recovery.",
wal_number, static_cast<int>(immutable_db_options_.wal_recovery_mode),
wal_filter.Name(), new_count, original_count);
status = Status::NotSupported(
"More than original # of records "
"returned by Wal Filter ",
wal_filter.Name());
return false;
}
WriteBatchInternal::SetSequence(&new_batch,
WriteBatchInternal::Sequence(&batch));
batch = new_batch;
}
return true;
}
void DBOpenLogRecordReadReporter::Corruption(size_t bytes, const Status& s,
uint64_t log_number) {
ROCKS_LOG_WARN(info_log, "%s%s: dropping %d bytes; %s",
(status == nullptr ? "(ignoring error) " : ""), fname,
static_cast<int>(bytes), s.ToString().c_str());
if (status != nullptr && status->ok()) {
*status = s;
corrupted_wal_number_ = log_number;
}
}
void DBOpenLogRecordReadReporter::OldLogRecord(size_t bytes) {
if (old_log_record != nullptr) {
*old_log_record = true;
}
ROCKS_LOG_WARN(info_log, "%s: dropping %d bytes; possibly recycled", fname,
static_cast<int>(bytes));
}
Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
SequenceNumber* next_sequence, bool read_only,
bool is_retry, bool* corrupted_wal_found,
RecoveryContext* recovery_ctx) {
mutex_.AssertHeld();
std::unordered_map<int, VersionEdit> version_edits;
int job_id = 0;
uint64_t min_wal_number = 0;
SetupLogFilesRecovery(wal_numbers, &version_edits, &job_id, &min_wal_number);
Status status = ProcessLogFiles(
wal_numbers, read_only, is_retry, min_wal_number, job_id, next_sequence,
&version_edits, corrupted_wal_found, recovery_ctx);
FinishLogFilesRecovery(job_id, status);
return status;
}
void DBImpl::SetupLogFilesRecovery(
const std::vector<uint64_t>& wal_numbers,
std::unordered_map<int, VersionEdit>* version_edits, int* job_id,
uint64_t* min_wal_number) {
assert(version_edits);
assert(job_id);
assert(min_wal_number);
for (auto cfd : *versions_->GetColumnFamilySet()) {
VersionEdit edit;
edit.SetColumnFamily(cfd->GetID());
version_edits->insert({cfd->GetID(), edit});
}
*job_id = next_job_id_.fetch_add(1);
{
auto stream = event_logger_.Log();
stream << "job" << *job_id;
stream << "event" << "recovery_started";
stream << "wal_files";
stream.StartArray();
for (auto wal_number : wal_numbers) {
stream << wal_number;
}
stream.EndArray();
}
InvokeWalFilterIfNeededOnColumnFamilyToWalNumberMap();
*min_wal_number = MinLogNumberToKeep();
if (!allow_2pc()) {
*min_wal_number =
std::max(*min_wal_number, versions_->MinLogNumberWithUnflushedData());
}
}
Status DBImpl::ProcessLogFiles(
const std::vector<uint64_t>& wal_numbers, bool read_only, bool is_retry,
uint64_t min_wal_number, int job_id, SequenceNumber* next_sequence,
std::unordered_map<int, VersionEdit>* version_edits,
bool* corrupted_wal_found, RecoveryContext* recovery_ctx) {
Status status;
bool stop_replay_by_wal_filter = false;
bool stop_replay_for_corruption = false;
bool flushed = false;
uint64_t corrupted_wal_number = kMaxSequenceNumber;
PredecessorWALInfo predecessor_wal_info;
for (auto wal_number : wal_numbers) {
if (!status.ok()) {
break;
}
SequenceNumber prev_next_sequence = *next_sequence;
if (status.ok()) {
status = ProcessLogFile(
wal_number, min_wal_number, is_retry, read_only, job_id,
next_sequence, &stop_replay_for_corruption,
&stop_replay_by_wal_filter, &corrupted_wal_number,
corrupted_wal_found, version_edits, &flushed, predecessor_wal_info);
}
if (status.ok()) {
status = CheckSeqnoNotSetBackDuringRecovery(prev_next_sequence,
*next_sequence);
}
}
if (status.ok()) {
status = MaybeHandleStopReplayForCorruptionForInconsistency(
stop_replay_for_corruption, corrupted_wal_number);
}
if (status.ok()) {
status = MaybeFlushFinalMemtableOrRestoreActiveLogFiles(
wal_numbers, read_only, job_id, flushed, version_edits, recovery_ctx);
}
return status;
}
Status DBImpl::ProcessLogFile(
uint64_t wal_number, uint64_t min_wal_number, bool is_retry, bool read_only,
int job_id, SequenceNumber* next_sequence, bool* stop_replay_for_corruption,
bool* stop_replay_by_wal_filter, uint64_t* corrupted_wal_number,
bool* corrupted_wal_found,
std::unordered_map<int, VersionEdit>* version_edits, bool* flushed,
PredecessorWALInfo& predecessor_wal_info) {
assert(stop_replay_by_wal_filter);
Status status;
bool old_log_record = false;
DBOpenLogRecordReadReporter reporter;
std::unique_ptr<log::Reader> reader;
std::string fname =
LogFileName(immutable_db_options_.GetWalDir(), wal_number);
auto logFileDropped = [this, &fname]() {
uint64_t bytes;
if (env_->GetFileSize(fname, &bytes).ok()) {
auto info_log = immutable_db_options_.info_log.get();
ROCKS_LOG_WARN(info_log, "%s: dropping %d bytes", fname.c_str(),
static_cast<int>(bytes));
}
};
std::string scratch;
Slice record;
uint64_t record_checksum;
const UnorderedMap<uint32_t, size_t>& running_ts_sz =
versions_->GetRunningColumnFamiliesTimestampSize();
SequenceNumber last_seqno_observed = 0;
if (wal_number < min_wal_number) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Skipping log #%" PRIu64
" since it is older than min log to keep #%" PRIu64,
wal_number, min_wal_number);
assert(status.ok());
return status;
}
SetupLogFileProcessing(wal_number);
if (*stop_replay_by_wal_filter) {
logFileDropped();
assert(status.ok());
return status;
}
Status init_status = InitializeLogReader(
wal_number, is_retry, fname, *stop_replay_for_corruption, min_wal_number,
predecessor_wal_info, &old_log_record, &status, &reporter, reader);
if (!init_status.ok()) {
assert(status.ok());
status.PermitUncheckedError();
return init_status;
} else if (reader == nullptr) {
assert(status.ok());
return status;
}
TEST_SYNC_POINT_CALLBACK("DBImpl::RecoverLogFiles:BeforeReadWal",
nullptr);
while (true) {
if (*stop_replay_by_wal_filter) {
break;
}
bool read_record = reader->ReadRecord(
&record, &scratch, immutable_db_options_.wal_recovery_mode,
&record_checksum);
if (!read_record || !status.ok()) {
break;
}
SequenceNumber prev_next_sequence = *next_sequence;
Status process_status = ProcessLogRecord(
record, reader, running_ts_sz, wal_number, fname, read_only, job_id,
logFileDropped, &reporter, &record_checksum, &last_seqno_observed,
next_sequence, stop_replay_for_corruption, &status,
stop_replay_by_wal_filter, version_edits, flushed);
if (!process_status.ok()) {
return process_status;
} else if (Status seqno_check_status = CheckSeqnoNotSetBackDuringRecovery(
prev_next_sequence, *next_sequence);
!seqno_check_status.ok()) {
return seqno_check_status;
} else if (*stop_replay_for_corruption) {
break;
}
}
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Recovered to log #%" PRIu64 " next seq #%" PRIu64, wal_number,
*next_sequence);
if (status.ok()) {
status = UpdatePredecessorWALInfo(wal_number, last_seqno_observed, fname,
predecessor_wal_info);
}
if (!status.ok() || old_log_record) {
status = HandleNonOkStatusOrOldLogRecord(
wal_number, next_sequence, status, reporter, &old_log_record,
stop_replay_for_corruption, corrupted_wal_number, corrupted_wal_found);
}
FinishLogFileProcessing(status, next_sequence);
return status;
}
void DBImpl::SetupLogFileProcessing(uint64_t wal_number) {
versions_->MarkFileNumberUsed(wal_number);
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Recovering log #%" PRIu64 " mode %d", wal_number,
static_cast<int>(immutable_db_options_.wal_recovery_mode));
}
Status DBImpl::InitializeLogReader(
uint64_t wal_number, bool is_retry, std::string& fname,
bool stop_replay_for_corruption, uint64_t min_wal_number,
const PredecessorWALInfo& predecessor_wal_info, bool* const old_log_record,
Status* const reporter_status, DBOpenLogRecordReadReporter* reporter,
std::unique_ptr<log::Reader>& reader) {
assert(old_log_record);
assert(reporter_status);
assert(reporter);
Status status;
std::unique_ptr<SequentialFileReader> file_reader;
{
std::unique_ptr<FSSequentialFile> file;
status = fs_->NewSequentialFile(
fname, fs_->OptimizeForLogRead(file_options_), &file, nullptr);
if (!status.ok()) {
MaybeIgnoreError(&status);
return status;
}
file_reader.reset(new SequentialFileReader(
std::move(file), fname, immutable_db_options_.log_readahead_size,
io_tracer_, {}, nullptr,
is_retry));
}
reporter->env = env_;
reporter->info_log = immutable_db_options_.info_log.get();
reporter->fname = fname.c_str();
reporter->old_log_record = old_log_record;
if (!immutable_db_options_.paranoid_checks ||
immutable_db_options_.wal_recovery_mode ==
WALRecoveryMode::kSkipAnyCorruptedRecords) {
reporter->status = nullptr;
} else {
reporter->status = reporter_status;
}
reader.reset(new log::Reader(
immutable_db_options_.info_log, std::move(file_reader), reporter,
true , wal_number,
immutable_db_options_.track_and_verify_wals, stop_replay_for_corruption,
min_wal_number, predecessor_wal_info));
return status;
}
Status DBImpl::ProcessLogRecord(
Slice record, const std::unique_ptr<log::Reader>& reader,
const UnorderedMap<uint32_t, size_t>& running_ts_sz, uint64_t wal_number,
const std::string& fname, bool read_only, int job_id,
const std::function<void()>& logFileDropped,
DBOpenLogRecordReadReporter* reporter, uint64_t* record_checksum,
SequenceNumber* last_seqno_observed, SequenceNumber* next_sequence,
bool* stop_replay_for_corruption, Status* status,
bool* stop_replay_by_wal_filter,
std::unordered_map<int, VersionEdit>* version_edits, bool* flushed) {
assert(reporter);
assert(last_seqno_observed);
assert(stop_replay_for_corruption);
assert(status);
assert(stop_replay_by_wal_filter);
Status process_status;
bool has_valid_writes = false;
WriteBatch batch;
std::unique_ptr<WriteBatch> new_batch;
WriteBatch* batch_to_use = nullptr;
if (record.size() < WriteBatchInternal::kHeader) {
reporter->Corruption(record.size(),
Status::Corruption("log record too small"));
assert(process_status.ok());
return process_status;
}
process_status = InitializeWriteBatchForLogRecord(
record, reader, running_ts_sz, &batch, new_batch, batch_to_use,
record_checksum);
if (!process_status.ok()) {
return process_status;
}
assert(batch_to_use);
*last_seqno_observed = WriteBatchInternal::Sequence(batch_to_use);
if (*last_seqno_observed > kMaxSequenceNumber) {
reporter->Corruption(
record.size(),
Status::Corruption("sequence " + std::to_string(*last_seqno_observed) +
" is too large"));
assert(process_status.ok());
return process_status;
}
MaybeReviseStopReplayForCorruption(*last_seqno_observed, next_sequence,
stop_replay_for_corruption);
if (*stop_replay_for_corruption) {
logFileDropped();
assert(process_status.ok());
return process_status;
}
if (!InvokeWalFilterIfNeededOnWalRecord(wal_number, fname, *reporter, *status,
*stop_replay_by_wal_filter,
*batch_to_use)) {
assert(process_status.ok());
return process_status;
} else {
status->PermitUncheckedError();
}
assert(process_status.ok());
process_status = InsertLogRecordToMemtable(batch_to_use, wal_number,
next_sequence, &has_valid_writes);
MaybeIgnoreError(&process_status);
if (!process_status.ok()) {
reporter->Corruption(record.size(), process_status);
process_status = Status::OK();
return process_status;
}
process_status = MaybeWriteLevel0TableForRecovery(
has_valid_writes, read_only, wal_number, job_id, next_sequence,
version_edits, flushed);
return process_status;
}
Status DBImpl::InitializeWriteBatchForLogRecord(
Slice record, const std::unique_ptr<log::Reader>& reader,
const UnorderedMap<uint32_t, size_t>& running_ts_sz, WriteBatch* batch,
std::unique_ptr<WriteBatch>& new_batch, WriteBatch*& batch_to_use,
uint64_t* record_checksum) {
assert(batch);
assert(record_checksum);
Status status = WriteBatchInternal::SetContents(batch, record);
if (!status.ok()) {
return status;
}
const UnorderedMap<uint32_t, size_t>& record_ts_sz =
reader->GetRecordedTimestampSize();
status = HandleWriteBatchTimestampSizeDifference(
batch, running_ts_sz, record_ts_sz,
TimestampSizeConsistencyMode::kReconcileInconsistency, seq_per_batch_,
batch_per_txn_, &new_batch);
if (!status.ok()) {
return status;
}
bool batch_updated = new_batch != nullptr;
batch_to_use = batch_updated ? new_batch.get() : batch;
TEST_SYNC_POINT_CALLBACK(
"DBImpl::RecoverLogFiles:BeforeUpdateProtectionInfo:batch", batch_to_use);
TEST_SYNC_POINT_CALLBACK(
"DBImpl::RecoverLogFiles:BeforeUpdateProtectionInfo:checksum",
record_checksum);
status = WriteBatchInternal::UpdateProtectionInfo(
batch_to_use, 8 ,
batch_updated ? nullptr : record_checksum);
return status;
}
void DBImpl::MaybeReviseStopReplayForCorruption(
SequenceNumber sequence, SequenceNumber const* const next_sequence,
bool* stop_replay_for_corruption) {
if (immutable_db_options_.wal_recovery_mode ==
WALRecoveryMode::kPointInTimeRecovery) {
assert(next_sequence);
assert(stop_replay_for_corruption);
if (sequence == *next_sequence) {
*stop_replay_for_corruption = false;
}
}
}
Status DBImpl::InsertLogRecordToMemtable(WriteBatch* batch_to_use,
uint64_t wal_number,
SequenceNumber* next_sequence,
bool* has_valid_writes) {
assert(batch_to_use);
assert(has_valid_writes);
Status status = WriteBatchInternal::InsertInto(
batch_to_use, column_family_memtables_.get(), &flush_scheduler_,
&trim_history_scheduler_, true, wal_number, this,
false , next_sequence, has_valid_writes,
seq_per_batch_, batch_per_txn_);
return status;
}
Status DBImpl::MaybeWriteLevel0TableForRecovery(
bool has_valid_writes, bool read_only, uint64_t wal_number, int job_id,
SequenceNumber const* const next_sequence,
std::unordered_map<int, VersionEdit>* version_edits, bool* flushed) {
assert(next_sequence);
assert(version_edits);
assert(flushed);
Status status;
if (has_valid_writes && !read_only) {
ColumnFamilyData* cfd;
while ((cfd = flush_scheduler_.TakeNextColumnFamily()) != nullptr) {
cfd->UnrefAndTryDelete();
assert(cfd->GetLogNumber() <= wal_number);
(void)wal_number;
auto iter = version_edits->find(cfd->GetID());
assert(iter != version_edits->end());
VersionEdit* edit = &iter->second;
status = WriteLevel0TableForRecovery(job_id, cfd, cfd->mem(), edit);
if (!status.ok()) {
return status;
}
*flushed = true;
cfd->CreateNewMemtable(*next_sequence - 1);
}
}
return status;
}
Status DBImpl::HandleNonOkStatusOrOldLogRecord(
uint64_t wal_number, SequenceNumber const* const next_sequence,
Status status, const DBOpenLogRecordReadReporter& reporter,
bool* old_log_record, bool* stop_replay_for_corruption,
uint64_t* corrupted_wal_number, bool* corrupted_wal_found) {
assert(!status.ok() || *old_log_record);
assert(next_sequence);
assert(old_log_record);
assert(stop_replay_for_corruption);
assert(corrupted_wal_number);
if (status.IsNotSupported()) {
return status;
}
if (immutable_db_options_.wal_recovery_mode ==
WALRecoveryMode::kSkipAnyCorruptedRecords) {
return Status::OK();
} else if (immutable_db_options_.wal_recovery_mode ==
WALRecoveryMode::kPointInTimeRecovery) {
if (status.IsIOError()) {
ROCKS_LOG_ERROR(immutable_db_options_.info_log,
"IOError during point-in-time reading log #%" PRIu64
" seq #%" PRIu64
". %s. This likely mean loss of synced WAL, "
"thus recovery fails.",
wal_number, *next_sequence, status.ToString().c_str());
return status;
}
*old_log_record = false;
*stop_replay_for_corruption = true;
uint64_t reporter_corrupted_wal_number = reporter.GetCorruptedLogNumber();
*corrupted_wal_number = reporter_corrupted_wal_number != kMaxSequenceNumber
? reporter_corrupted_wal_number
: wal_number;
if (corrupted_wal_found != nullptr) {
*corrupted_wal_found = true;
}
return Status::OK();
} else {
assert(immutable_db_options_.wal_recovery_mode ==
WALRecoveryMode::kTolerateCorruptedTailRecords ||
immutable_db_options_.wal_recovery_mode ==
WALRecoveryMode::kAbsoluteConsistency);
return status;
}
}
Status DBImpl::UpdatePredecessorWALInfo(
uint64_t wal_number, const SequenceNumber last_seqno_observed,
const std::string& fname, PredecessorWALInfo& predecessor_wal_info) {
uint64_t bytes;
Status s = env_->GetFileSize(fname, &bytes);
if (!s.ok()) {
return s;
}
SequenceNumber mock_seqno = kMaxSequenceNumber;
[[maybe_unused]] std::pair<uint64_t, SequenceNumber*> pair =
std::make_pair(wal_number, &mock_seqno);
TEST_SYNC_POINT_CALLBACK("DBImpl::UpdatePredecessorWALInfo", &pair);
predecessor_wal_info = PredecessorWALInfo(
wal_number, bytes,
mock_seqno != kMaxSequenceNumber ? mock_seqno : last_seqno_observed);
return s;
}
void DBImpl::FinishLogFileProcessing(const Status& status,
const SequenceNumber* next_sequence) {
if (status.ok()) {
assert(next_sequence);
flush_scheduler_.Clear();
trim_history_scheduler_.Clear();
auto last_sequence = *next_sequence - 1;
if ((*next_sequence != kMaxSequenceNumber) &&
(versions_->LastSequence() <= last_sequence)) {
versions_->SetLastAllocatedSequence(last_sequence);
versions_->SetLastPublishedSequence(last_sequence);
versions_->SetLastSequence(last_sequence);
}
}
}
Status DBImpl::MaybeHandleStopReplayForCorruptionForInconsistency(
bool stop_replay_for_corruption, uint64_t corrupted_wal_number) {
Status status;
if (stop_replay_for_corruption == true &&
(immutable_db_options_.wal_recovery_mode ==
WALRecoveryMode::kPointInTimeRecovery ||
immutable_db_options_.wal_recovery_mode ==
WALRecoveryMode::kTolerateCorruptedTailRecords)) {
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->GetLogNumber() > corrupted_wal_number &&
cfd->GetLiveSstFilesSize() > 0) {
ROCKS_LOG_ERROR(immutable_db_options_.info_log,
"Column family inconsistency: SST file contains data"
" beyond the point of corruption.");
status = Status::Corruption(
"Column family inconsistency: SST file contains data"
" beyond the point of corruption in CF " +
cfd->GetName() +
". WAL recovery stopped at corruption point, but SST files"
" contain newer data.");
return status;
}
}
}
return status;
}
Status DBImpl::MaybeFlushFinalMemtableOrRestoreActiveLogFiles(
const std::vector<uint64_t>& wal_numbers, bool read_only, int job_id,
bool flushed, std::unordered_map<int, VersionEdit>* version_edits,
RecoveryContext* recovery_ctx) {
assert(version_edits);
Status status;
bool data_seen = false;
if (!read_only) {
const WalNumber max_wal_number = wal_numbers.back();
for (auto cfd : *versions_->GetColumnFamilySet()) {
auto iter = version_edits->find(cfd->GetID());
assert(iter != version_edits->end());
VersionEdit* edit = &iter->second;
if (cfd->GetLogNumber() > max_wal_number) {
assert(cfd->mem()->GetFirstSequenceNumber() == 0);
assert(edit->NumEntries() == 0);
continue;
}
TEST_SYNC_POINT_CALLBACK(
"DBImpl::RecoverLogFiles:BeforeFlushFinalMemtable", nullptr);
if (cfd->mem()->GetFirstSequenceNumber() != 0) {
if (flushed || !immutable_db_options_.avoid_flush_during_recovery) {
status = WriteLevel0TableForRecovery(job_id, cfd, cfd->mem(), edit);
if (!status.ok()) {
break;
}
flushed = true;
cfd->CreateNewMemtable(versions_->LastSequence());
}
data_seen = true;
}
if (flushed || cfd->mem()->GetFirstSequenceNumber() == 0) {
edit->SetLogNumber(max_wal_number + 1);
}
}
if (status.ok()) {
versions_->MarkFileNumberUsed(max_wal_number + 1);
assert(recovery_ctx != nullptr);
for (auto* cfd : *versions_->GetColumnFamilySet()) {
auto iter = version_edits->find(cfd->GetID());
assert(iter != version_edits->end());
recovery_ctx->UpdateVersionEdits(cfd, iter->second);
}
if (flushed || !data_seen) {
VersionEdit wal_deletion;
if (immutable_db_options_.track_and_verify_wals_in_manifest) {
wal_deletion.DeleteWalsBefore(max_wal_number + 1);
}
if (!allow_2pc()) {
wal_deletion.SetMinLogNumberToKeep(max_wal_number + 1);
}
assert(versions_->GetColumnFamilySet() != nullptr);
recovery_ctx->UpdateVersionEdits(
versions_->GetColumnFamilySet()->GetDefault(), wal_deletion);
}
}
}
if (status.ok()) {
if (data_seen && !flushed) {
status = RestoreAliveLogFiles(wal_numbers);
} else if (!wal_numbers.empty()) { const bool truncate = !read_only;
GetLogSizeAndMaybeTruncate(wal_numbers.back(), truncate, nullptr)
.PermitUncheckedError();
}
}
return status;
}
Status DBImpl::CheckSeqnoNotSetBackDuringRecovery(
SequenceNumber prev_next_seqno, SequenceNumber current_next_seqno) {
if (prev_next_seqno == kMaxSequenceNumber ||
prev_next_seqno <= current_next_seqno) {
return Status::OK();
}
std::string msg =
"Sequence number is being set backwards during recovery, this is likely "
"a software bug or a data corruption. Prev next seqno: " +
std::to_string(prev_next_seqno) +
" , current next seqno: " + std::to_string(current_next_seqno);
return Status::Corruption(msg);
}
void DBImpl::FinishLogFilesRecovery(int job_id, const Status& status) {
event_logger_.Log() << "job" << job_id << "event"
<< (status.ok() ? "recovery_finished" : "recovery_failed")
<< "status" << status.ToString();
}
Status DBImpl::GetLogSizeAndMaybeTruncate(uint64_t wal_number, bool truncate,
WalFileNumberSize* log_ptr) {
WalFileNumberSize log(wal_number);
std::string fname =
LogFileName(immutable_db_options_.GetWalDir(), wal_number);
Status s;
s = env_->GetFileSize(fname, &log.size);
TEST_SYNC_POINT_CALLBACK("DBImpl::GetLogSizeAndMaybeTruncate:0", &s);
if (s.ok() && truncate) {
std::unique_ptr<FSWritableFile> last_log;
Status truncate_status = fs_->ReopenWritableFile(
fname,
fs_->OptimizeForLogWrite(
file_options_,
BuildDBOptions(immutable_db_options_, mutable_db_options_)),
&last_log, nullptr);
if (truncate_status.ok()) {
truncate_status = last_log->Truncate(log.size, IOOptions(), nullptr);
}
if (truncate_status.ok()) {
truncate_status = last_log->Close(IOOptions(), nullptr);
}
if (!truncate_status.ok() && !truncate_status.IsNotSupported()) {
ROCKS_LOG_WARN(immutable_db_options_.info_log,
"Failed to truncate log #%" PRIu64 ": %s", wal_number,
truncate_status.ToString().c_str());
}
}
if (log_ptr) {
*log_ptr = log;
}
return s;
}
Status DBImpl::RestoreAliveLogFiles(const std::vector<uint64_t>& wal_numbers) {
if (wal_numbers.empty()) {
return Status::OK();
}
Status s;
mutex_.AssertHeld();
assert(immutable_db_options_.avoid_flush_during_recovery);
wals_total_size_.StoreRelaxed(0);
wal_empty_ = false;
uint64_t min_wal_with_unflushed_data =
versions_->MinLogNumberWithUnflushedData();
for (auto wal_number : wal_numbers) {
if (!allow_2pc() && wal_number < min_wal_with_unflushed_data) {
continue;
}
WalFileNumberSize log;
s = GetLogSizeAndMaybeTruncate(
wal_number, (wal_number == wal_numbers.back()), &log);
if (!s.ok()) {
break;
}
wals_total_size_.FetchAddRelaxed(log.size);
alive_wal_files_.push_back(log);
}
return s;
}
Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
MemTable* mem, VersionEdit* edit) {
mutex_.AssertHeld();
assert(cfd);
assert(cfd->imm());
assert(std::numeric_limits<uint64_t>::max() ==
cfd->imm()->GetEarliestMemTableID());
const uint64_t start_micros = immutable_db_options_.clock->NowMicros();
FileMetaData meta;
std::vector<BlobFileAddition> blob_file_additions;
std::unique_ptr<std::list<uint64_t>::iterator> pending_outputs_inserted_elem(
new std::list<uint64_t>::iterator(
CaptureCurrentFileNumberInPendingOutputs()));
meta.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0);
ReadOptions ro;
ro.total_order_seek = true;
ro.io_activity = Env::IOActivity::kDBOpen;
Arena arena;
Status s;
TableProperties table_properties;
const auto* ucmp = cfd->internal_comparator().user_comparator();
assert(ucmp);
const size_t ts_sz = ucmp->timestamp_size();
const bool logical_strip_timestamp =
ts_sz > 0 && !cfd->ioptions().persist_user_defined_timestamps;
InternalStats::CompactionStats flush_stats(CompactionReason::kFlush,
1 );
{
ScopedArenaPtr<InternalIterator> iter(
logical_strip_timestamp
? mem->NewTimestampStrippingIterator(
ro, nullptr, &arena,
nullptr, ts_sz)
: mem->NewIterator(ro, nullptr, &arena,
nullptr,
true));
ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
"[%s] [WriteLevel0TableForRecovery]"
" Level-0 table #%" PRIu64 ": started",
cfd->GetName().c_str(), meta.fd.GetNumber());
const MutableCFOptions mutable_cf_options_copy =
cfd->GetLatestMutableCFOptions();
bool paranoid_file_checks =
cfd->GetLatestMutableCFOptions().paranoid_file_checks;
int64_t _current_time = 0;
immutable_db_options_.clock->GetCurrentTime(&_current_time)
.PermitUncheckedError(); const uint64_t current_time = static_cast<uint64_t>(_current_time);
meta.oldest_ancester_time = current_time;
meta.epoch_number = cfd->NewEpochNumber();
{
auto write_hint = cfd->current()->storage_info()->CalculateSSTWriteHint(
0,
immutable_db_options_.calculate_sst_write_lifetime_hint_set);
mutex_.Unlock();
SequenceNumber earliest_write_conflict_snapshot;
std::vector<SequenceNumber> snapshot_seqs =
snapshots_.GetAll(&earliest_write_conflict_snapshot);
SequenceNumber earliest_snapshot =
(snapshot_seqs.empty() ? kMaxSequenceNumber : snapshot_seqs.at(0));
auto snapshot_checker = snapshot_checker_.get();
if (use_custom_gc_ && snapshot_checker == nullptr) {
snapshot_checker = DisableGCSnapshotChecker::Instance();
}
std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>>
range_del_iters;
auto range_del_iter =
logical_strip_timestamp
? mem->NewTimestampStrippingRangeTombstoneIterator(
ro, kMaxSequenceNumber, ts_sz)
: mem->NewRangeTombstoneIterator(ro, kMaxSequenceNumber,
false );
if (range_del_iter != nullptr) {
range_del_iters.emplace_back(range_del_iter);
}
IOStatus io_s;
const ReadOptions read_option(Env::IOActivity::kDBOpen);
const WriteOptions write_option(Env::IO_HIGH, Env::IOActivity::kDBOpen);
TableBuilderOptions tboptions(
cfd->ioptions(), mutable_cf_options_copy, read_option, write_option,
cfd->internal_comparator(), cfd->internal_tbl_prop_coll_factories(),
GetCompressionFlush(cfd->ioptions(), mutable_cf_options_copy),
mutable_cf_options_copy.compression_opts, cfd->GetID(),
cfd->GetName(), 0 , current_time ,
false , TableFileCreationReason::kRecovery,
0 , 0 , db_id_,
db_session_id_, 0 , meta.fd.GetNumber(),
kMaxSequenceNumber);
Version* version = cfd->current();
version->Ref();
TableProperties temp_table_proerties;
s = BuildTable(
dbname_, versions_.get(), immutable_db_options_, tboptions,
file_options_for_compaction_, cfd->table_cache(), iter.get(),
std::move(range_del_iters), &meta, &blob_file_additions,
snapshot_seqs, earliest_snapshot, earliest_write_conflict_snapshot,
kMaxSequenceNumber, snapshot_checker, paranoid_file_checks,
cfd->internal_stats(), &io_s, io_tracer_,
BlobFileCreationReason::kRecovery,
nullptr , &event_logger_, job_id,
&temp_table_proerties , write_hint,
nullptr , &blob_callback_, version,
nullptr ,
nullptr , &flush_stats);
version->Unref();
LogFlush(immutable_db_options_.info_log);
ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
"[%s] [WriteLevel0TableForRecovery]"
" Level-0 table #%" PRIu64 ": %" PRIu64 " bytes %s",
cfd->GetName().c_str(), meta.fd.GetNumber(),
meta.fd.GetFileSize(), s.ToString().c_str());
mutex_.Lock();
if (!io_s.ok() && s.ok()) {
s = io_s;
}
uint64_t total_num_entries = mem->NumEntries();
if (s.ok() && total_num_entries != flush_stats.num_input_records) {
std::string msg = "Expected " + std::to_string(total_num_entries) +
" entries in memtable, but read " +
std::to_string(flush_stats.num_input_records);
ROCKS_LOG_WARN(immutable_db_options_.info_log,
"[%s] [JOB %d] Level-0 flush during recover: %s",
cfd->GetName().c_str(), job_id, msg.c_str());
if (immutable_db_options_.flush_verify_memtable_count) {
s = Status::Corruption(msg);
}
}
const auto& mutable_cf_options = cfd->GetLatestMutableCFOptions();
if (s.ok() &&
(mutable_cf_options.table_factory->IsInstanceOf(
TableFactory::kBlockBasedTableName()) ||
mutable_cf_options.table_factory->IsInstanceOf(
TableFactory::kPlainTableName())) &&
flush_stats.num_output_records != temp_table_proerties.num_entries) {
std::string msg =
"Number of keys in flush output SST files does not match "
"number of keys added to the table. Expected " +
std::to_string(flush_stats.num_output_records) + " but there are " +
std::to_string(temp_table_proerties.num_entries) +
" in output SST files";
ROCKS_LOG_WARN(immutable_db_options_.info_log,
"[%s] [JOB %d] Level-0 flush during recover: %s",
cfd->GetName().c_str(), job_id, msg.c_str());
if (immutable_db_options_.flush_verify_memtable_count) {
s = Status::Corruption(msg);
}
}
}
}
ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
const bool has_output = meta.fd.GetFileSize() > 0;
constexpr int level = 0;
if (s.ok() && has_output) {
edit->AddFile(level, meta.fd.GetNumber(), meta.fd.GetPathId(),
meta.fd.GetFileSize(), meta.smallest, meta.largest,
meta.fd.smallest_seqno, meta.fd.largest_seqno,
meta.marked_for_compaction, meta.temperature,
meta.oldest_blob_file_number, meta.oldest_ancester_time,
meta.file_creation_time, meta.epoch_number,
meta.file_checksum, meta.file_checksum_func_name,
meta.unique_id, meta.compensated_range_deletion_size,
meta.tail_size, meta.user_defined_timestamps_persisted);
for (const auto& blob : blob_file_additions) {
edit->AddBlobFile(blob);
}
if (logical_strip_timestamp) {
Slice mem_newest_udt = mem->GetNewestUDT();
std::string full_history_ts_low = cfd->GetFullHistoryTsLow();
if (full_history_ts_low.empty() ||
ucmp->CompareTimestamp(mem_newest_udt, full_history_ts_low) >= 0) {
std::string new_full_history_ts_low;
GetFullHistoryTsLowFromU64CutoffTs(&mem_newest_udt,
&new_full_history_ts_low);
edit->SetFullHistoryTsLow(new_full_history_ts_low);
}
}
}
flush_stats.micros = immutable_db_options_.clock->NowMicros() - start_micros;
if (has_output) {
flush_stats.bytes_written = meta.fd.GetFileSize();
flush_stats.num_output_files = 1;
}
const auto& blobs = edit->GetBlobFileAdditions();
for (const auto& blob : blobs) {
flush_stats.bytes_written_blob += blob.GetTotalBlobBytes();
}
flush_stats.num_output_files_blob = static_cast<int>(blobs.size());
cfd->internal_stats()->AddCompactionStats(level, Env::Priority::USER,
flush_stats);
cfd->internal_stats()->AddCFStats(
InternalStats::BYTES_FLUSHED,
flush_stats.bytes_written + flush_stats.bytes_written_blob);
RecordTick(stats_, COMPACT_WRITE_BYTES, meta.fd.GetFileSize());
return s;
}
Status DB::Open(const Options& options, const std::string& dbname,
std::unique_ptr<DB>* dbptr) {
DBOptions db_options(options);
ColumnFamilyOptions cf_options(options);
std::vector<ColumnFamilyDescriptor> column_families;
column_families.emplace_back(kDefaultColumnFamilyName, cf_options);
if (db_options.persist_stats_to_disk) {
column_families.emplace_back(kPersistentStatsColumnFamilyName, cf_options);
}
std::vector<ColumnFamilyHandle*> handles;
Status s = DB::Open(db_options, dbname, column_families, &handles, dbptr);
if (s.ok()) {
if (db_options.persist_stats_to_disk) {
assert(handles.size() == 2);
} else {
assert(handles.size() == 1);
}
if (db_options.persist_stats_to_disk && handles[1] != nullptr) {
delete handles[1];
}
delete handles[0];
}
return s;
}
Status DB::Open(const DBOptions& db_options, const std::string& dbname,
const std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle*>* handles,
std::unique_ptr<DB>* dbptr) {
const bool kSeqPerBatch = true;
const bool kBatchPerTxn = true;
ThreadStatusUtil::SetEnableTracking(db_options.enable_thread_tracking);
ThreadStatusUtil::SetThreadOperation(ThreadStatus::OperationType::OP_DBOPEN);
bool can_retry = false;
Status s;
do {
s = DBImpl::Open(db_options, dbname, column_families, handles, dbptr,
!kSeqPerBatch, kBatchPerTxn, can_retry, &can_retry);
} while (!s.ok() && can_retry);
ThreadStatusUtil::ResetThreadStatus();
return s;
}
Status DB::OpenAndTrimHistory(
const DBOptions& db_options, const std::string& dbname,
const std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle*>* handles, std::unique_ptr<DB>* dbptr,
std::string trim_ts) {
assert(dbptr != nullptr);
assert(handles != nullptr);
auto validate_options = [&db_options] {
if (db_options.avoid_flush_during_recovery) {
return Status::InvalidArgument(
"avoid_flush_during_recovery incompatible with "
"OpenAndTrimHistory");
}
return Status::OK();
};
auto s = validate_options();
if (!s.ok()) {
return s;
}
std::unique_ptr<DB> db;
s = DB::Open(db_options, dbname, column_families, handles, &db);
if (!s.ok()) {
return s;
}
assert(db);
CompactRangeOptions options;
options.bottommost_level_compaction =
BottommostLevelCompaction::kForceOptimized;
auto db_impl = static_cast_with_check<DBImpl>(db.get());
for (auto handle : *handles) {
assert(handle != nullptr);
auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(handle);
auto cfd = cfh->cfd();
assert(cfd != nullptr);
if (cfd->user_comparator() != nullptr &&
cfd->user_comparator()->timestamp_size() > 0) {
s = db_impl->CompactRangeInternal(options, handle, nullptr, nullptr,
trim_ts);
if (!s.ok()) {
break;
}
}
}
auto clean_op = [&handles, &db] {
for (auto handle : *handles) {
auto temp_s = db->DestroyColumnFamilyHandle(handle);
assert(temp_s.ok());
}
handles->clear();
db.reset();
};
if (!s.ok()) {
clean_op();
return s;
}
*dbptr = std::move(db);
return s;
}
IOStatus DBImpl::CreateWAL(const WriteOptions& write_options,
uint64_t log_file_num, uint64_t recycle_log_number,
size_t preallocate_block_size,
const PredecessorWALInfo& predecessor_wal_info,
log::Writer** new_log) {
IOStatus io_s;
std::unique_ptr<FSWritableFile> lfile;
DBOptions db_options =
BuildDBOptions(immutable_db_options_, mutable_db_options_);
FileOptions opt_file_options =
fs_->OptimizeForLogWrite(file_options_, db_options);
opt_file_options.write_hint = CalculateWALWriteHint();
if (immutable_db_options_.wal_write_temperature != Temperature::kUnknown) {
opt_file_options.temperature = immutable_db_options_.wal_write_temperature;
}
std::string wal_dir = immutable_db_options_.GetWalDir();
std::string log_fname = LogFileName(wal_dir, log_file_num);
if (recycle_log_number) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"reusing log %" PRIu64 " from recycle list\n",
recycle_log_number);
std::string old_log_fname = LogFileName(wal_dir, recycle_log_number);
TEST_SYNC_POINT("DBImpl::CreateWAL:BeforeReuseWritableFile1");
TEST_SYNC_POINT("DBImpl::CreateWAL:BeforeReuseWritableFile2");
io_s = fs_->ReuseWritableFile(log_fname, old_log_fname, opt_file_options,
&lfile, nullptr);
} else {
io_s = NewWritableFile(fs_.get(), log_fname, &lfile, opt_file_options);
}
if (io_s.ok()) {
lfile->SetWriteLifeTimeHint(opt_file_options.write_hint);
lfile->SetPreallocationBlockSize(preallocate_block_size);
const auto& listeners = immutable_db_options_.listeners;
FileTypeSet tmp_set = immutable_db_options_.checksum_handoff_file_types;
std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
std::move(lfile), log_fname, opt_file_options,
immutable_db_options_.clock, io_tracer_, nullptr ,
Histograms::HISTOGRAM_ENUM_MAX , listeners, nullptr,
tmp_set.Contains(FileType::kWalFile),
tmp_set.Contains(FileType::kWalFile)));
*new_log = new log::Writer(std::move(file_writer), log_file_num,
immutable_db_options_.recycle_log_file_num > 0,
immutable_db_options_.manual_wal_flush,
immutable_db_options_.wal_compression,
immutable_db_options_.track_and_verify_wals);
io_s = (*new_log)->AddCompressionTypeRecord(write_options);
if (io_s.ok()) {
io_s = (*new_log)->MaybeAddPredecessorWALInfo(write_options,
predecessor_wal_info);
}
}
return io_s;
}
void DBImpl::TrackExistingDataFiles(
const std::vector<std::string>& existing_data_files) {
TrackOrUntrackFiles(existing_data_files, true);
}
Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
const std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle*>* handles,
std::unique_ptr<DB>* dbptr, const bool seq_per_batch,
const bool batch_per_txn, const bool is_retry,
bool* can_retry) {
const WriteOptions write_options(Env::IOActivity::kDBOpen);
const ReadOptions read_options(Env::IOActivity::kDBOpen);
Status s = ValidateOptionsByTable(db_options, column_families);
if (!s.ok()) {
return s;
}
s = ValidateOptions(db_options, column_families);
if (!s.ok()) {
return s;
}
*dbptr = nullptr;
assert(handles);
handles->clear();
size_t max_write_buffer_size = 0;
MinAndMaxPreserveSeconds preserve_info;
for (const auto& cf : column_families) {
max_write_buffer_size =
std::max(max_write_buffer_size, cf.options.write_buffer_size);
preserve_info.Combine(cf.options);
}
auto impl = std::make_unique<DBImpl>(db_options, dbname, seq_per_batch,
batch_per_txn);
if (!impl->immutable_db_options_.info_log) {
s = impl->init_logger_creation_s_;
return s;
} else {
assert(impl->init_logger_creation_s_.ok());
}
s = impl->env_->CreateDirIfMissing(impl->immutable_db_options_.GetWalDir());
if (s.ok()) {
std::vector<std::string> paths;
for (auto& db_path : impl->immutable_db_options_.db_paths) {
paths.emplace_back(db_path.path);
}
for (auto& cf : column_families) {
for (auto& cf_path : cf.options.cf_paths) {
paths.emplace_back(cf_path.path);
}
}
for (const auto& path : paths) {
s = impl->env_->CreateDirIfMissing(path);
if (!s.ok()) {
break;
}
}
if (paths.size() <= 1) {
impl->error_handler_.EnableAutoRecovery();
}
}
if (s.ok()) {
s = impl->CreateArchivalDirectory();
}
if (!s.ok()) {
return s;
}
impl->wal_in_db_path_ = impl->immutable_db_options_.IsWalDirSameAsDBPath();
RecoveryContext recovery_ctx;
impl->options_mutex_.Lock();
impl->mutex_.Lock();
uint64_t recovered_seq(kMaxSequenceNumber);
s = impl->Recover(column_families, false ,
false ,
false , is_retry,
&recovered_seq, &recovery_ctx, can_retry);
if (s.ok()) {
uint64_t new_log_number = impl->versions_->NewFileNumber();
log::Writer* new_log = nullptr;
const size_t preallocate_block_size =
impl->GetWalPreallocateBlockSize(max_write_buffer_size);
s = impl->CreateWAL(write_options, new_log_number, 0 ,
preallocate_block_size,
PredecessorWALInfo() ,
&new_log);
if (s.ok()) {
impl->min_wal_number_to_recycle_ = new_log_number;
}
if (s.ok()) {
InstrumentedMutexLock wl(&impl->wal_write_mutex_);
impl->cur_wal_number_ = new_log_number;
assert(new_log != nullptr);
assert(impl->logs_.empty());
impl->logs_.emplace_back(new_log_number, new_log);
}
if (s.ok()) {
impl->alive_wal_files_.emplace_back(impl->cur_wal_number_);
if (recovered_seq != kMaxSequenceNumber) {
WriteBatch empty_batch;
WriteBatchInternal::SetSequence(&empty_batch, recovered_seq);
uint64_t wal_used, log_size;
log::Writer* log_writer = impl->logs_.back().writer;
WalFileNumberSize& wal_file_number_size = impl->alive_wal_files_.back();
assert(log_writer->get_log_number() == wal_file_number_size.number);
impl->mutex_.AssertHeld();
s = impl->WriteToWAL(empty_batch, write_options, log_writer, &wal_used,
&log_size, wal_file_number_size, recovered_seq);
if (s.ok()) {
s = impl->FlushWAL(write_options, false);
TEST_SYNC_POINT_CALLBACK("DBImpl::Open::BeforeSyncWAL", &s);
IOOptions opts;
if (s.ok()) {
s = WritableFileWriter::PrepareIOOptions(write_options, opts);
}
if (s.ok()) {
s = log_writer->file()->Sync(opts,
impl->immutable_db_options_.use_fsync);
}
}
}
}
}
if (s.ok()) {
s = impl->LogAndApplyForRecovery(recovery_ctx);
}
if (s.ok() && !impl->immutable_db_options_.write_identity_file) {
impl->env_->DeleteFile(IdentityFileName(impl->dbname_))
.PermitUncheckedError();
}
if (s.ok() && impl->immutable_db_options_.persist_stats_to_disk) {
impl->mutex_.AssertHeld();
s = impl->InitPersistStatsColumnFamily();
}
if (s.ok() && recovery_ctx.is_new_db_ && preserve_info.IsEnabled()) {
impl->PrepopulateSeqnoToTimeMapping(preserve_info);
}
if (s.ok()) {
for (const auto& cf : column_families) {
auto cfd =
impl->versions_->GetColumnFamilySet()->GetColumnFamily(cf.name);
if (cfd != nullptr) {
handles->push_back(
new ColumnFamilyHandleImpl(cfd, impl.get(), &impl->mutex_));
impl->NewThreadStatusCfInfo(cfd);
SuperVersionContext sv_context( true);
impl->InstallSuperVersionForConfigChange(cfd, &sv_context);
sv_context.Clean();
} else {
if (db_options.create_missing_column_families) {
ColumnFamilyHandle* handle = nullptr;
impl->mutex_.Unlock();
s = impl->CreateColumnFamilyImpl(read_options, write_options,
cf.options, cf.name, &handle);
impl->mutex_.Lock();
if (s.ok()) {
handles->push_back(handle);
} else {
break;
}
} else {
s = Status::InvalidArgument("Column family not found", cf.name);
break;
}
}
}
}
if (s.ok() && impl->immutable_db_options_.persist_stats_to_disk) {
assert(impl->persist_stats_cf_handle_);
assert(impl->persist_stats_cf_handle_->cfd());
SuperVersionContext sv_context( true);
impl->InstallSuperVersionForConfigChange(
impl->persist_stats_cf_handle_->cfd(), &sv_context);
sv_context.Clean();
s = impl->PersistentStatsProcessFormatVersion();
}
if (s.ok()) {
for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
if (!cfd->mem()->IsSnapshotSupported()) {
impl->is_snapshot_supported_ = false;
}
if (cfd->ioptions().merge_operator != nullptr &&
!cfd->mem()->IsMergeOperatorSupported()) {
s = Status::InvalidArgument(
"The memtable of column family %s does not support merge operator "
"its options.merge_operator is non-null",
cfd->GetName().c_str());
}
if (!s.ok()) {
break;
}
}
}
TEST_SYNC_POINT("DBImpl::Open:Opened");
Status persist_options_status;
if (s.ok()) {
persist_options_status =
impl->WriteOptionsFile(write_options, true );
impl->opened_successfully_ = true;
} else {
persist_options_status.PermitUncheckedError();
}
impl->mutex_.Unlock();
auto sfm = static_cast<SstFileManagerImpl*>(
impl->immutable_db_options_.sst_file_manager.get());
if (s.ok() && sfm) {
sfm->SetStatisticsPtr(impl->immutable_db_options_.statistics);
ROCKS_LOG_INFO(impl->immutable_db_options_.info_log,
"SstFileManager instance %p", sfm);
impl->TrackExistingDataFiles(recovery_ctx.existing_data_files_);
sfm->ReserveDiskBuffer(max_write_buffer_size,
impl->immutable_db_options_.db_paths[0].path);
}
if (s.ok()) {
for (const auto& path : impl->CollectAllDBPaths()) {
DeleteScheduler::CleanupDirectory(impl->immutable_db_options_.env, sfm,
path)
.PermitUncheckedError();
}
impl->mutex_.Lock();
impl->DeleteObsoleteFiles();
TEST_SYNC_POINT("DBImpl::Open:AfterDeleteFiles");
impl->MaybeScheduleFlushOrCompaction();
impl->mutex_.Unlock();
}
if (s.ok()) {
ROCKS_LOG_HEADER(impl->immutable_db_options_.info_log, "DB pointer %p",
impl.get());
LogFlush(impl->immutable_db_options_.info_log);
if (!impl->WALBufferIsEmpty()) {
s = impl->FlushWAL(write_options, false);
if (s.ok()) {
log::Writer* log_writer = impl->logs_.back().writer;
IOOptions opts;
s = WritableFileWriter::PrepareIOOptions(write_options, opts);
if (s.ok()) {
s = log_writer->file()->Sync(opts,
impl->immutable_db_options_.use_fsync);
}
}
}
if (s.ok() && !persist_options_status.ok()) {
s = Status::IOError(
"DB::Open() failed --- Unable to persist Options file",
persist_options_status.ToString());
}
}
if (!s.ok()) {
ROCKS_LOG_WARN(impl->immutable_db_options_.info_log,
"DB::Open() failed: %s", s.ToString().c_str());
}
if (s.ok()) {
s = impl->StartPeriodicTaskScheduler();
}
if (s.ok()) {
s = impl->RegisterRecordSeqnoTimeWorker();
}
impl->options_mutex_.Unlock();
if (s.ok()) {
*dbptr = std::move(impl);
} else {
for (auto* h : *handles) {
delete h;
}
handles->clear();
}
return s;
}
}