#pragma once
#include "db/version_builder.h"
#include "db/version_edit.h"
#include "db/version_set.h"
namespace ROCKSDB_NAMESPACE {
struct FileMetaData;
class VersionEditHandlerBase {
public:
explicit VersionEditHandlerBase(const ReadOptions& read_options)
: read_options_(read_options),
max_manifest_read_size_(std::numeric_limits<uint64_t>::max()) {}
virtual ~VersionEditHandlerBase() {}
void Iterate(log::Reader& reader, Status* log_read_status);
const Status& status() const { return status_; }
AtomicGroupReadBuffer& GetReadBuffer() { return read_buffer_; }
protected:
explicit VersionEditHandlerBase(const ReadOptions& read_options,
uint64_t max_read_size)
: read_options_(read_options), max_manifest_read_size_(max_read_size) {}
virtual Status Initialize() { return Status::OK(); }
virtual Status ApplyVersionEdit(VersionEdit& edit,
ColumnFamilyData** cfd) = 0;
virtual Status OnAtomicGroupReplayBegin() { return Status::OK(); }
virtual Status OnAtomicGroupReplayEnd() { return Status::OK(); }
virtual void CheckIterationResult(const log::Reader& ,
Status* ) {}
void ClearReadBuffer() { read_buffer_.Clear(); }
Status status_;
const ReadOptions& read_options_;
private:
AtomicGroupReadBuffer read_buffer_;
const uint64_t max_manifest_read_size_;
};
class ListColumnFamiliesHandler : public VersionEditHandlerBase {
public:
explicit ListColumnFamiliesHandler(const ReadOptions& read_options)
: VersionEditHandlerBase(read_options) {}
~ListColumnFamiliesHandler() override {}
const std::map<uint32_t, std::string> GetColumnFamilyNames() const {
return column_family_names_;
}
protected:
Status ApplyVersionEdit(VersionEdit& edit,
ColumnFamilyData** ) override;
private:
std::map<uint32_t, std::string> column_family_names_{
{0, kDefaultColumnFamilyName}};
};
class FileChecksumRetriever : public VersionEditHandlerBase {
public:
FileChecksumRetriever(const ReadOptions& read_options, uint64_t max_read_size)
: VersionEditHandlerBase(read_options, max_read_size) {}
~FileChecksumRetriever() override {}
Status FetchFileChecksumList(FileChecksumList& file_checksum_list);
protected:
Status ApplyVersionEdit(VersionEdit& edit,
ColumnFamilyData** ) override;
private:
std::unordered_map<
uint32_t,
std::unordered_map<uint64_t, std::pair<std::string, std::string>>>
cf_file_checksums_;
};
using VersionBuilderUPtr = std::unique_ptr<BaseReferencedVersionBuilder>;
class VersionEditHandler : public VersionEditHandlerBase {
public:
explicit VersionEditHandler(
bool read_only,
const std::vector<ColumnFamilyDescriptor>& column_families,
VersionSet* version_set, bool track_found_and_missing_files,
bool no_error_if_files_missing,
const std::shared_ptr<IOTracer>& io_tracer,
const ReadOptions& read_options, bool allow_incomplete_valid_version,
EpochNumberRequirement epoch_number_requirement =
EpochNumberRequirement::kMustPresent)
: VersionEditHandler(read_only, column_families, version_set,
track_found_and_missing_files,
no_error_if_files_missing, io_tracer, read_options,
false,
allow_incomplete_valid_version,
epoch_number_requirement) {}
~VersionEditHandler() override {}
const VersionEditParams& GetVersionEditParams() const {
return version_edit_params_;
}
void GetDbId(std::string* db_id) const {
if (db_id && version_edit_params_.HasDbId()) {
*db_id = version_edit_params_.GetDbId();
}
}
virtual Status VerifyFile(ColumnFamilyData* ,
const std::string& , int ,
const FileMetaData& ) {
return Status::OK();
}
virtual Status VerifyBlobFile(ColumnFamilyData* ,
uint64_t ,
const BlobFileAddition& ) {
return Status::OK();
}
protected:
explicit VersionEditHandler(
bool read_only, std::vector<ColumnFamilyDescriptor> column_families,
VersionSet* version_set, bool track_found_and_missing_files,
bool no_error_if_files_missing,
const std::shared_ptr<IOTracer>& io_tracer,
const ReadOptions& read_options, bool skip_load_table_files,
bool allow_incomplete_valid_version,
EpochNumberRequirement epoch_number_requirement =
EpochNumberRequirement::kMustPresent);
Status ApplyVersionEdit(VersionEdit& edit, ColumnFamilyData** cfd) override;
virtual Status OnColumnFamilyAdd(VersionEdit& edit, ColumnFamilyData** cfd);
Status OnColumnFamilyDrop(VersionEdit& edit, ColumnFamilyData** cfd);
Status OnNonCfOperation(VersionEdit& edit, ColumnFamilyData** cfd);
Status OnWalAddition(VersionEdit& edit);
Status OnWalDeletion(VersionEdit& edit);
Status Initialize() override;
void CheckColumnFamilyId(const VersionEdit& edit, bool* do_not_open_cf,
bool* cf_in_builders) const;
void CheckIterationResult(const log::Reader& reader, Status* s) override;
ColumnFamilyData* CreateCfAndInit(const ColumnFamilyOptions& cf_options,
const VersionEdit& edit);
virtual ColumnFamilyData* DestroyCfAndCleanup(const VersionEdit& edit);
virtual Status MaybeCreateVersionBeforeApplyEdit(const VersionEdit& edit,
ColumnFamilyData* cfd,
bool force_create_version);
virtual Status LoadTables(ColumnFamilyData* cfd,
bool prefetch_index_and_filter_in_cache,
bool is_initial_load);
virtual bool MustOpenAllColumnFamilies() const {
return !version_set_->unchanging();
}
const bool read_only_;
std::vector<ColumnFamilyDescriptor> column_families_;
VersionSet* version_set_;
std::unordered_map<uint32_t, VersionBuilderUPtr> builders_;
std::unordered_map<std::string, ColumnFamilyOptions> name_to_options_;
const bool track_found_and_missing_files_;
std::unordered_map<uint32_t, std::string> do_not_open_column_families_;
VersionEditParams version_edit_params_;
bool no_error_if_files_missing_;
std::shared_ptr<IOTracer> io_tracer_;
bool skip_load_table_files_;
bool initialized_;
std::unique_ptr<std::unordered_map<uint32_t, std::string>> cf_to_cmp_names_;
const bool allow_incomplete_valid_version_;
EpochNumberRequirement epoch_number_requirement_;
std::unordered_set<uint32_t> cfds_to_mark_no_udt_;
private:
Status ExtractInfoFromVersionEdit(ColumnFamilyData* cfd,
const VersionEdit& edit);
Status MaybeHandleFileBoundariesForNewFiles(VersionEdit& edit,
const ColumnFamilyData* cfd);
};
class VersionEditHandlerPointInTime : public VersionEditHandler {
public:
VersionEditHandlerPointInTime(
bool read_only, std::vector<ColumnFamilyDescriptor> column_families,
VersionSet* version_set, const std::shared_ptr<IOTracer>& io_tracer,
const ReadOptions& read_options, bool allow_incomplete_valid_version,
EpochNumberRequirement epoch_number_requirement =
EpochNumberRequirement::kMustPresent);
~VersionEditHandlerPointInTime() override;
bool HasMissingFiles() const;
virtual Status VerifyFile(ColumnFamilyData* cfd, const std::string& fpath,
int level, const FileMetaData& fmeta) override;
virtual Status VerifyBlobFile(ColumnFamilyData* cfd, uint64_t blob_file_num,
const BlobFileAddition& blob_addition) override;
protected:
Status OnAtomicGroupReplayBegin() override;
Status OnAtomicGroupReplayEnd() override;
void CheckIterationResult(const log::Reader& reader, Status* s) override;
ColumnFamilyData* DestroyCfAndCleanup(const VersionEdit& edit) override;
Status MaybeCreateVersionBeforeApplyEdit(const VersionEdit& edit,
ColumnFamilyData* cfd,
bool force_create_version) override;
Status LoadTables(ColumnFamilyData* cfd,
bool prefetch_index_and_filter_in_cache,
bool is_initial_load) override;
std::unordered_map<uint32_t, Version*> versions_;
std::unordered_map<uint32_t, Version*> atomic_update_versions_;
size_t atomic_update_versions_missing_;
bool in_atomic_group_ = false;
private:
bool AtomicUpdateVersionsCompleted();
bool AtomicUpdateVersionsContains(uint32_t cfid);
void AtomicUpdateVersionsDropCf(uint32_t cfid);
void AtomicUpdateVersionsPut(Version* version);
void AtomicUpdateVersionsApply();
};
class ManifestTailer : public VersionEditHandlerPointInTime {
public:
explicit ManifestTailer(std::vector<ColumnFamilyDescriptor> column_families,
VersionSet* version_set,
const std::shared_ptr<IOTracer>& io_tracer,
const ReadOptions& read_options,
EpochNumberRequirement epoch_number_requirement =
EpochNumberRequirement::kMustPresent)
: VersionEditHandlerPointInTime(
true, column_families, version_set, io_tracer,
read_options,
false, epoch_number_requirement),
mode_(Mode::kRecovery) {}
Status VerifyFile(ColumnFamilyData* cfd, const std::string& fpath, int level,
const FileMetaData& fmeta) override;
void PrepareToReadNewManifest() {
initialized_ = false;
ClearReadBuffer();
}
std::unordered_set<ColumnFamilyData*>& GetUpdatedColumnFamilies() {
return cfds_changed_;
}
std::vector<std::string> GetAndClearIntermediateFiles();
protected:
Status Initialize() override;
bool MustOpenAllColumnFamilies() const override { return false; }
Status ApplyVersionEdit(VersionEdit& edit, ColumnFamilyData** cfd) override;
Status OnColumnFamilyAdd(VersionEdit& edit, ColumnFamilyData** cfd) override;
void CheckIterationResult(const log::Reader& reader, Status* s) override;
enum Mode : uint8_t {
kRecovery = 0,
kCatchUp = 1,
};
Mode mode_;
std::unordered_set<ColumnFamilyData*> cfds_changed_;
};
class DumpManifestHandler : public VersionEditHandler {
public:
DumpManifestHandler(std::vector<ColumnFamilyDescriptor> column_families,
VersionSet* version_set,
const std::shared_ptr<IOTracer>& io_tracer,
const ReadOptions& read_options, bool verbose, bool hex,
bool json)
: VersionEditHandler(
true, column_families, version_set,
false,
false, io_tracer, read_options,
true,
false,
EpochNumberRequirement::kMustPresent),
verbose_(verbose),
hex_(hex),
json_(json),
count_(0) {
cf_to_cmp_names_.reset(new std::unordered_map<uint32_t, std::string>());
}
~DumpManifestHandler() override {}
Status ApplyVersionEdit(VersionEdit& edit, ColumnFamilyData** cfd) override {
if (json_) {
std::string edit_dump_str = edit.DebugJSON(count_, hex_);
fwrite(edit_dump_str.data(), sizeof(char), edit_dump_str.size(), stdout);
fwrite("\n", sizeof(char), 1, stdout);
} else if (verbose_) {
std::string edit_dump_str = edit.DebugString(hex_);
fwrite(edit_dump_str.data(), sizeof(char), edit_dump_str.size(), stdout);
}
++count_;
return VersionEditHandler::ApplyVersionEdit(edit, cfd);
}
void CheckIterationResult(const log::Reader& reader, Status* s) override;
private:
const bool verbose_;
const bool hex_;
const bool json_;
int count_;
};
}