#include "db/version_set.h"
#include <algorithm>
#include <array>
#include <cinttypes>
#include <cstdio>
#include <list>
#include <map>
#include <set>
#include <stdexcept>
#include <string>
#include <unordered_map>
#include <vector>
#include "db/blob/blob_fetcher.h"
#include "db/blob/blob_file_cache.h"
#include "db/blob/blob_file_reader.h"
#include "db/blob/blob_log_format.h"
#include "db/blob/blob_source.h"
#include "db/compaction/compaction.h"
#include "db/compaction/file_pri.h"
#include "db/dbformat.h"
#include "db/internal_stats.h"
#include "db/log_reader.h"
#include "db/log_writer.h"
#include "db/manifest_ops.h"
#include "db/memtable.h"
#include "db/merge_context.h"
#include "db/merge_helper.h"
#include "db/pinned_iterators_manager.h"
#include "db/table_cache.h"
#include "db/version_builder.h"
#include "db/version_edit.h"
#include "db/version_edit_handler.h"
#include "db/wide/wide_columns_helper.h"
#include "file/file_util.h"
#include "table/compaction_merging_iterator.h"
#if USE_COROUTINES
#include "folly/coro/BlockingWait.h"
#include "folly/coro/Collect.h"
#endif
#include "file/filename.h"
#include "file/random_access_file_reader.h"
#include "file/read_write_util.h"
#include "file/writable_file_writer.h"
#include "logging/logging.h"
#include "monitoring/file_read_sample.h"
#include "monitoring/perf_context_imp.h"
#include "monitoring/persistent_stats_history.h"
#include "options/options_helper.h"
#include "rocksdb/env.h"
#include "rocksdb/merge_operator.h"
#include "rocksdb/write_buffer_manager.h"
#include "table/format.h"
#include "table/get_context.h"
#include "table/internal_iterator.h"
#include "table/merging_iterator.h"
#include "table/meta_blocks.h"
#include "table/multiget_context.h"
#include "table/plain/plain_table_factory.h"
#include "table/table_reader.h"
#include "table/two_level_iterator.h"
#include "table/unique_id_impl.h"
#include "test_util/sync_point.h"
#include "util/cast_util.h"
#include "util/coding.h"
#include "util/coro_utils.h"
#include "util/stop_watch.h"
#include "util/string_util.h"
#include "util/user_comparator_wrapper.h"
#define WITHOUT_COROUTINES
#include "db/version_set_sync_and_async.h"
#undef WITHOUT_COROUTINES
#define WITH_COROUTINES
#include "db/version_set_sync_and_async.h"
#undef WITH_COROUTINES
namespace ROCKSDB_NAMESPACE {
namespace {
using ScanOptionsMap = std::unordered_map<size_t, MultiScanArgs>;
int FindFileInRange(const InternalKeyComparator& icmp,
const LevelFilesBrief& file_level, const Slice& key,
uint32_t left, uint32_t right) {
auto cmp = [&](const FdWithKeyRange& f, const Slice& k) -> bool {
return icmp.InternalKeyComparator::Compare(f.largest_key, k) < 0;
};
const auto& b = file_level.files;
return static_cast<int>(std::lower_bound(b + left, b + right, key, cmp) - b);
}
Status OverlapWithIterator(const Comparator* ucmp,
const Slice& smallest_user_key,
const Slice& largest_user_key,
InternalIterator* iter, bool* overlap) {
InternalKey range_start(smallest_user_key, kMaxSequenceNumber,
kValueTypeForSeek);
iter->Seek(range_start.Encode());
if (!iter->status().ok()) {
return iter->status();
}
*overlap = false;
if (iter->Valid()) {
ParsedInternalKey seek_result;
Status s = ParseInternalKey(iter->key(), &seek_result,
false ); if (!s.ok()) {
return s;
}
if (ucmp->CompareWithoutTimestamp(seek_result.user_key, largest_user_key) <=
0) {
*overlap = true;
}
}
return iter->status();
}
class FilePicker {
public:
FilePicker(const Slice& user_key, const Slice& ikey,
autovector<LevelFilesBrief>* file_levels, unsigned int num_levels,
FileIndexer* file_indexer, const Comparator* user_comparator,
const InternalKeyComparator* internal_comparator)
: num_levels_(num_levels),
curr_level_(static_cast<unsigned int>(-1)),
returned_file_level_(static_cast<unsigned int>(-1)),
hit_file_level_(static_cast<unsigned int>(-1)),
search_left_bound_(0),
search_right_bound_(FileIndexer::kLevelMaxIndex),
level_files_brief_(file_levels),
is_hit_file_last_in_level_(false),
curr_file_level_(nullptr),
user_key_(user_key),
ikey_(ikey),
file_indexer_(file_indexer),
user_comparator_(user_comparator),
internal_comparator_(internal_comparator) {
search_ended_ = !PrepareNextLevel();
if (!search_ended_) {
for (unsigned int i = 0; i < (*level_files_brief_)[0].num_files; ++i) {
auto* r = (*level_files_brief_)[0].files[i].fd.table_reader;
if (r) {
r->Prepare(ikey);
}
}
}
}
int GetCurrentLevel() const { return curr_level_; }
FdWithKeyRange* GetNextFile() {
while (!search_ended_) { while (curr_index_in_curr_level_ < curr_file_level_->num_files) {
FdWithKeyRange* f = &curr_file_level_->files[curr_index_in_curr_level_];
hit_file_level_ = curr_level_;
is_hit_file_last_in_level_ =
curr_index_in_curr_level_ == curr_file_level_->num_files - 1;
int cmp_largest = -1;
if (num_levels_ > 1 || curr_file_level_->num_files > 3) {
assert(curr_level_ == 0 ||
curr_index_in_curr_level_ == start_index_in_curr_level_ ||
user_comparator_->CompareWithoutTimestamp(
user_key_, ExtractUserKey(f->smallest_key)) <= 0);
int cmp_smallest = user_comparator_->CompareWithoutTimestamp(
user_key_, ExtractUserKey(f->smallest_key));
if (cmp_smallest >= 0) {
cmp_largest = user_comparator_->CompareWithoutTimestamp(
user_key_, ExtractUserKey(f->largest_key));
}
if (curr_level_ > 0) {
file_indexer_->GetNextLevelIndex(
curr_level_, curr_index_in_curr_level_, cmp_smallest,
cmp_largest, &search_left_bound_, &search_right_bound_);
}
if (cmp_smallest < 0 || cmp_largest > 0) {
if (curr_level_ == 0) {
++curr_index_in_curr_level_;
continue;
} else {
break;
}
}
}
returned_file_level_ = curr_level_;
if (curr_level_ > 0 && cmp_largest < 0) {
search_ended_ = !PrepareNextLevel();
} else {
++curr_index_in_curr_level_;
}
return f;
}
search_ended_ = !PrepareNextLevel();
}
return nullptr;
}
unsigned int GetHitFileLevel() { return hit_file_level_; }
bool IsHitFileLastInLevel() { return is_hit_file_last_in_level_; }
private:
unsigned int num_levels_;
unsigned int curr_level_;
unsigned int returned_file_level_;
unsigned int hit_file_level_;
int32_t search_left_bound_;
int32_t search_right_bound_;
autovector<LevelFilesBrief>* level_files_brief_;
bool search_ended_;
bool is_hit_file_last_in_level_;
LevelFilesBrief* curr_file_level_;
unsigned int curr_index_in_curr_level_;
unsigned int start_index_in_curr_level_;
Slice user_key_;
Slice ikey_;
FileIndexer* file_indexer_;
const Comparator* user_comparator_;
const InternalKeyComparator* internal_comparator_;
bool PrepareNextLevel() {
curr_level_++;
while (curr_level_ < num_levels_) {
curr_file_level_ = &(*level_files_brief_)[curr_level_];
if (curr_file_level_->num_files == 0) {
assert(search_left_bound_ == 0);
assert(search_right_bound_ == -1 ||
search_right_bound_ == FileIndexer::kLevelMaxIndex);
search_left_bound_ = 0;
search_right_bound_ = FileIndexer::kLevelMaxIndex;
curr_level_++;
continue;
}
int32_t start_index;
if (curr_level_ == 0) {
start_index = 0;
} else {
if (search_left_bound_ <= search_right_bound_) {
if (search_right_bound_ == FileIndexer::kLevelMaxIndex) {
search_right_bound_ =
static_cast<int32_t>(curr_file_level_->num_files) - 1;
}
start_index =
FindFileInRange(*internal_comparator_, *curr_file_level_, ikey_,
static_cast<uint32_t>(search_left_bound_),
static_cast<uint32_t>(search_right_bound_) + 1);
if (start_index == search_right_bound_ + 1) {
search_left_bound_ = 0;
search_right_bound_ = FileIndexer::kLevelMaxIndex;
curr_level_++;
continue;
}
} else {
search_left_bound_ = 0;
search_right_bound_ = FileIndexer::kLevelMaxIndex;
curr_level_++;
continue;
}
}
start_index_in_curr_level_ = start_index;
curr_index_in_curr_level_ = start_index;
return true;
}
return false;
}
};
}
class FilePickerMultiGet {
private:
struct FilePickerContext;
public:
FilePickerMultiGet(MultiGetRange* range,
autovector<LevelFilesBrief>* file_levels,
unsigned int num_levels, FileIndexer* file_indexer,
const Comparator* user_comparator,
const InternalKeyComparator* internal_comparator)
: num_levels_(num_levels),
curr_level_(static_cast<unsigned int>(-1)),
returned_file_level_(static_cast<unsigned int>(-1)),
hit_file_level_(static_cast<unsigned int>(-1)),
range_(*range, range->begin(), range->end()),
maybe_repeat_key_(false),
current_level_range_(*range, range->begin(), range->end()),
current_file_range_(*range, range->begin(), range->end()),
batch_iter_(range->begin()),
batch_iter_prev_(range->begin()),
upper_key_(range->begin()),
level_files_brief_(file_levels),
is_hit_file_last_in_level_(false),
curr_file_level_(nullptr),
file_indexer_(file_indexer),
user_comparator_(user_comparator),
internal_comparator_(internal_comparator),
hit_file_(nullptr) {
for (auto iter = range_.begin(); iter != range_.end(); ++iter) {
fp_ctx_array_[iter.index()] =
FilePickerContext(0, FileIndexer::kLevelMaxIndex);
}
search_ended_ = !PrepareNextLevel();
if (!search_ended_) {
for (unsigned int i = 0; i < (*level_files_brief_)[0].num_files; ++i) {
auto* r = (*level_files_brief_)[0].files[i].fd.table_reader;
if (r) {
for (auto iter = range_.begin(); iter != range_.end(); ++iter) {
r->Prepare(iter->ikey);
}
}
}
}
}
FilePickerMultiGet(MultiGetRange* range, const FilePickerMultiGet& other)
: num_levels_(other.num_levels_),
curr_level_(other.curr_level_),
returned_file_level_(other.returned_file_level_),
hit_file_level_(other.hit_file_level_),
fp_ctx_array_(other.fp_ctx_array_),
range_(*range, range->begin(), range->end()),
maybe_repeat_key_(false),
current_level_range_(*range, range->begin(), range->end()),
current_file_range_(*range, range->begin(), range->end()),
batch_iter_(range->begin()),
batch_iter_prev_(range->begin()),
upper_key_(range->begin()),
level_files_brief_(other.level_files_brief_),
is_hit_file_last_in_level_(false),
curr_file_level_(other.curr_file_level_),
file_indexer_(other.file_indexer_),
user_comparator_(other.user_comparator_),
internal_comparator_(other.internal_comparator_),
hit_file_(nullptr) {
PrepareNextLevelForSearch();
}
int GetCurrentLevel() const { return curr_level_; }
void PrepareNextLevelForSearch() { search_ended_ = !PrepareNextLevel(); }
FdWithKeyRange* GetNextFileInLevel() {
if (batch_iter_ == current_level_range_.end() || search_ended_) {
hit_file_ = nullptr;
return nullptr;
} else {
if (maybe_repeat_key_) {
maybe_repeat_key_ = false;
if (current_level_range_.CheckKeyDone(batch_iter_) ||
curr_level_ == 0) {
batch_iter_ = upper_key_;
}
}
batch_iter_prev_ = batch_iter_;
}
MultiGetRange next_file_range(current_level_range_, batch_iter_prev_,
current_level_range_.end());
size_t curr_file_index =
(batch_iter_ != current_level_range_.end())
? fp_ctx_array_[batch_iter_.index()].curr_index_in_curr_level
: curr_file_level_->num_files;
FdWithKeyRange* f;
bool is_last_key_in_file;
if (!GetNextFileInLevelWithKeys(&next_file_range, &curr_file_index, &f,
&is_last_key_in_file)) {
hit_file_ = nullptr;
return nullptr;
} else {
if (is_last_key_in_file) {
auto tmp_iter = batch_iter_;
while (tmp_iter != upper_key_) {
++(fp_ctx_array_[tmp_iter.index()].curr_index_in_curr_level);
++tmp_iter;
}
maybe_repeat_key_ = true;
}
current_file_range_ =
MultiGetRange(next_file_range, batch_iter_prev_, upper_key_);
returned_file_level_ = curr_level_;
hit_file_level_ = curr_level_;
is_hit_file_last_in_level_ =
curr_file_index == curr_file_level_->num_files - 1;
hit_file_ = f;
return f;
}
}
unsigned int GetHitFileLevel() { return hit_file_level_; }
FdWithKeyRange* GetHitFile() { return hit_file_; }
bool IsHitFileLastInLevel() { return is_hit_file_last_in_level_; }
bool KeyMaySpanNextFile() { return maybe_repeat_key_; }
bool IsSearchEnded() { return search_ended_; }
const MultiGetRange& CurrentFileRange() { return current_file_range_; }
bool RemainingOverlapInLevel() {
return !current_level_range_.Suffix(current_file_range_).empty();
}
MultiGetRange& GetRange() { return range_; }
void ReplaceRange(const MultiGetRange& other) {
assert(hit_file_ == nullptr);
range_ = other;
current_level_range_ = other;
}
FilePickerMultiGet(FilePickerMultiGet&& other)
: num_levels_(other.num_levels_),
curr_level_(other.curr_level_),
returned_file_level_(other.returned_file_level_),
hit_file_level_(other.hit_file_level_),
fp_ctx_array_(std::move(other.fp_ctx_array_)),
range_(std::move(other.range_)),
maybe_repeat_key_(other.maybe_repeat_key_),
current_level_range_(std::move(other.current_level_range_)),
current_file_range_(std::move(other.current_file_range_)),
batch_iter_(other.batch_iter_, ¤t_level_range_),
batch_iter_prev_(other.batch_iter_prev_, ¤t_level_range_),
upper_key_(other.upper_key_, ¤t_level_range_),
level_files_brief_(other.level_files_brief_),
search_ended_(other.search_ended_),
is_hit_file_last_in_level_(other.is_hit_file_last_in_level_),
curr_file_level_(other.curr_file_level_),
file_indexer_(other.file_indexer_),
user_comparator_(other.user_comparator_),
internal_comparator_(other.internal_comparator_),
hit_file_(other.hit_file_) {}
private:
unsigned int num_levels_;
unsigned int curr_level_;
unsigned int returned_file_level_;
unsigned int hit_file_level_;
struct FilePickerContext {
int32_t search_left_bound;
int32_t search_right_bound;
unsigned int curr_index_in_curr_level;
unsigned int start_index_in_curr_level;
FilePickerContext(int32_t left, int32_t right)
: search_left_bound(left),
search_right_bound(right),
curr_index_in_curr_level(0),
start_index_in_curr_level(0) {}
FilePickerContext() = default;
};
std::array<FilePickerContext, MultiGetContext::MAX_BATCH_SIZE> fp_ctx_array_;
MultiGetRange range_;
bool maybe_repeat_key_;
MultiGetRange current_level_range_;
MultiGetRange current_file_range_;
MultiGetRange::Iterator batch_iter_;
MultiGetRange::Iterator batch_iter_prev_;
MultiGetRange::Iterator upper_key_;
autovector<LevelFilesBrief>* level_files_brief_;
bool search_ended_;
bool is_hit_file_last_in_level_;
LevelFilesBrief* curr_file_level_;
FileIndexer* file_indexer_;
const Comparator* user_comparator_;
const InternalKeyComparator* internal_comparator_;
FdWithKeyRange* hit_file_;
bool GetNextFileInLevelWithKeys(MultiGetRange* next_file_range,
size_t* file_index, FdWithKeyRange** fd,
bool* is_last_key_in_file) {
size_t curr_file_index = *file_index;
FdWithKeyRange* f = nullptr;
bool file_hit = false;
int cmp_largest = -1;
int cmp_smallest = -1;
if (curr_file_index >= curr_file_level_->num_files) {
if (batch_iter_ != current_level_range_.end()) {
#ifndef NDEBUG
if (curr_level_ < num_levels_ + 1) {
if ((*level_files_brief_)[curr_level_].num_files == 0) {
struct FilePickerContext& fp_ctx =
fp_ctx_array_[batch_iter_.index()];
assert(fp_ctx.search_left_bound == 0);
assert(fp_ctx.search_right_bound == -1 ||
fp_ctx.search_right_bound == FileIndexer::kLevelMaxIndex);
}
}
#endif
++batch_iter_;
for (; batch_iter_ != current_level_range_.end(); ++batch_iter_) {
struct FilePickerContext& fp_ctx = fp_ctx_array_[batch_iter_.index()];
fp_ctx.search_left_bound = 0;
fp_ctx.search_right_bound = FileIndexer::kLevelMaxIndex;
}
}
return false;
}
while (batch_iter_ != current_level_range_.end() &&
(fp_ctx_array_[batch_iter_.index()].curr_index_in_curr_level ==
curr_file_index ||
!file_hit)) {
struct FilePickerContext& fp_ctx = fp_ctx_array_[batch_iter_.index()];
f = &curr_file_level_->files[fp_ctx.curr_index_in_curr_level];
Slice& user_key = batch_iter_->ukey_without_ts;
if (num_levels_ > 1 || curr_file_level_->num_files > 3) {
cmp_smallest = user_comparator_->CompareWithoutTimestamp(
user_key, false, ExtractUserKey(f->smallest_key), true);
assert(curr_level_ == 0 ||
fp_ctx.curr_index_in_curr_level ==
fp_ctx.start_index_in_curr_level ||
cmp_smallest <= 0);
if (cmp_smallest >= 0) {
cmp_largest = user_comparator_->CompareWithoutTimestamp(
user_key, false, ExtractUserKey(f->largest_key), true);
} else {
cmp_largest = -1;
}
if (curr_level_ > 0) {
file_indexer_->GetNextLevelIndex(
curr_level_, fp_ctx.curr_index_in_curr_level, cmp_smallest,
cmp_largest, &fp_ctx.search_left_bound,
&fp_ctx.search_right_bound);
}
if (cmp_smallest < 0 || cmp_largest > 0) {
next_file_range->SkipKey(batch_iter_);
} else {
file_hit = true;
}
} else {
file_hit = true;
}
if (cmp_largest == 0) {
upper_key_ = batch_iter_;
++upper_key_;
while (upper_key_ != current_level_range_.end() &&
user_comparator_->CompareWithoutTimestamp(
batch_iter_->ukey_without_ts, false,
upper_key_->ukey_without_ts, false) == 0) {
if (curr_level_ > 0) {
struct FilePickerContext& ctx = fp_ctx_array_[upper_key_.index()];
file_indexer_->GetNextLevelIndex(
curr_level_, ctx.curr_index_in_curr_level, cmp_smallest,
cmp_largest, &ctx.search_left_bound, &ctx.search_right_bound);
}
++upper_key_;
}
break;
} else {
if (curr_level_ == 0) {
++fp_ctx.curr_index_in_curr_level;
}
++batch_iter_;
}
if (!file_hit) {
curr_file_index =
(batch_iter_ != current_level_range_.end())
? fp_ctx_array_[batch_iter_.index()].curr_index_in_curr_level
: curr_file_level_->num_files;
}
}
*fd = f;
*file_index = curr_file_index;
*is_last_key_in_file = cmp_largest == 0;
if (!*is_last_key_in_file) {
upper_key_ = batch_iter_;
}
return file_hit;
}
bool PrepareNextLevel() {
if (curr_level_ == 0) {
MultiGetRange::Iterator mget_iter = current_level_range_.begin();
if (fp_ctx_array_[mget_iter.index()].curr_index_in_curr_level <
curr_file_level_->num_files) {
batch_iter_prev_ = current_level_range_.begin();
upper_key_ = batch_iter_ = current_level_range_.begin();
return true;
}
}
curr_level_++;
while (curr_level_ < num_levels_) {
bool level_contains_keys = false;
curr_file_level_ = &(*level_files_brief_)[curr_level_];
if (curr_file_level_->num_files == 0) {
for (auto mget_iter = current_level_range_.begin();
mget_iter != current_level_range_.end(); ++mget_iter) {
struct FilePickerContext& fp_ctx = fp_ctx_array_[mget_iter.index()];
assert(fp_ctx.search_left_bound == 0);
assert(fp_ctx.search_right_bound == -1 ||
fp_ctx.search_right_bound == FileIndexer::kLevelMaxIndex);
fp_ctx.search_left_bound = 0;
fp_ctx.search_right_bound = FileIndexer::kLevelMaxIndex;
}
do {
++curr_level_;
} while ((curr_level_ < num_levels_) &&
(*level_files_brief_)[curr_level_].num_files == 0);
continue;
}
int32_t start_index = -1;
current_level_range_ =
MultiGetRange(range_, range_.begin(), range_.end());
for (auto mget_iter = current_level_range_.begin();
mget_iter != current_level_range_.end(); ++mget_iter) {
struct FilePickerContext& fp_ctx = fp_ctx_array_[mget_iter.index()];
if (curr_level_ == 0) {
start_index = 0;
level_contains_keys = true;
} else {
if (fp_ctx.search_left_bound <= fp_ctx.search_right_bound) {
if (fp_ctx.search_right_bound == FileIndexer::kLevelMaxIndex) {
fp_ctx.search_right_bound =
static_cast<int32_t>(curr_file_level_->num_files) - 1;
}
Slice& ikey = mget_iter->ikey;
start_index = FindFileInRange(
*internal_comparator_, *curr_file_level_, ikey,
static_cast<uint32_t>(fp_ctx.search_left_bound),
static_cast<uint32_t>(fp_ctx.search_right_bound) + 1);
if (start_index == fp_ctx.search_right_bound + 1) {
fp_ctx.search_left_bound = 0;
fp_ctx.search_right_bound = FileIndexer::kLevelMaxIndex;
current_level_range_.SkipKey(mget_iter);
continue;
} else {
level_contains_keys = true;
}
} else {
fp_ctx.search_left_bound = 0;
fp_ctx.search_right_bound = FileIndexer::kLevelMaxIndex;
current_level_range_.SkipKey(mget_iter);
continue;
}
}
assert(start_index >= 0);
assert(start_index < static_cast<int32_t>(curr_file_level_->num_files));
fp_ctx.start_index_in_curr_level = start_index;
fp_ctx.curr_index_in_curr_level = start_index;
}
if (level_contains_keys) {
batch_iter_prev_ = current_level_range_.begin();
upper_key_ = batch_iter_ = current_level_range_.begin();
return true;
}
curr_level_++;
}
return false;
}
};
VersionStorageInfo::~VersionStorageInfo() { delete[] files_; }
Version::~Version() {
assert(refs_ == 0);
prev_->next_ = next_;
next_->prev_ = prev_;
for (int level = 0; level < storage_info_.num_levels_; level++) {
for (size_t i = 0; i < storage_info_.files_[level].size(); i++) {
FileMetaData* f = storage_info_.files_[level][i];
assert(f->refs > 0);
f->refs--;
if (f->refs <= 0) {
assert(cfd_ != nullptr);
auto* sv = cfd_->GetSuperVersion();
uint32_t path_id = f->fd.GetPathId();
assert(path_id < cfd_->ioptions().cf_paths.size());
vset_->obsolete_files_.emplace_back(
f, cfd_->ioptions().cf_paths[path_id].path,
sv ? sv->mutable_cf_options.uncache_aggressiveness : 0,
cfd_->GetFileMetadataCacheReservationManager());
}
}
}
}
int FindFile(const InternalKeyComparator& icmp,
const LevelFilesBrief& file_level, const Slice& key) {
return FindFileInRange(icmp, file_level, key, 0,
static_cast<uint32_t>(file_level.num_files));
}
void DoGenerateLevelFilesBrief(LevelFilesBrief* file_level,
const std::vector<FileMetaData*>& files,
Arena* arena) {
assert(file_level);
assert(arena);
size_t num = files.size();
file_level->num_files = num;
char* mem = arena->AllocateAligned(num * sizeof(FdWithKeyRange));
file_level->files = new (mem) FdWithKeyRange[num];
for (size_t i = 0; i < num; i++) {
Slice smallest_key = files[i]->smallest.Encode();
Slice largest_key = files[i]->largest.Encode();
size_t smallest_size = smallest_key.size();
size_t largest_size = largest_key.size();
mem = arena->AllocateAligned(smallest_size + largest_size);
memcpy(mem, smallest_key.data(), smallest_size);
memcpy(mem + smallest_size, largest_key.data(), largest_size);
FdWithKeyRange& f = file_level->files[i];
f.fd = files[i]->fd;
f.file_metadata = files[i];
f.smallest_key = Slice(mem, smallest_size);
f.largest_key = Slice(mem + smallest_size, largest_size);
}
}
static bool AfterFile(const Comparator* ucmp, const Slice* user_key,
const FdWithKeyRange* f) {
return (user_key != nullptr &&
ucmp->CompareWithoutTimestamp(*user_key,
ExtractUserKey(f->largest_key)) > 0);
}
static bool BeforeFile(const Comparator* ucmp, const Slice* user_key,
const FdWithKeyRange* f) {
return (user_key != nullptr &&
ucmp->CompareWithoutTimestamp(*user_key,
ExtractUserKey(f->smallest_key)) < 0);
}
bool SomeFileOverlapsRange(const InternalKeyComparator& icmp,
bool disjoint_sorted_files,
const LevelFilesBrief& file_level,
const Slice* smallest_user_key,
const Slice* largest_user_key) {
const Comparator* ucmp = icmp.user_comparator();
if (!disjoint_sorted_files) {
for (size_t i = 0; i < file_level.num_files; i++) {
const FdWithKeyRange* f = &(file_level.files[i]);
if (AfterFile(ucmp, smallest_user_key, f) ||
BeforeFile(ucmp, largest_user_key, f)) {
} else {
return true; }
}
return false;
}
uint32_t index = 0;
if (smallest_user_key != nullptr) {
InternalKey small;
small.SetMinPossibleForUserKey(*smallest_user_key);
index = FindFile(icmp, file_level, small.Encode());
}
if (index >= file_level.num_files) {
return false;
}
return !BeforeFile(ucmp, largest_user_key, &file_level.files[index]);
}
namespace {
class LevelIterator final : public InternalIterator {
public:
LevelIterator(
TableCache* table_cache, const ReadOptions& read_options,
const FileOptions& file_options, const InternalKeyComparator& icomparator,
const LevelFilesBrief* flevel, const MutableCFOptions& mutable_cf_options,
bool should_sample, HistogramImpl* file_read_hist,
TableReaderCaller caller, bool skip_filters, int level,
RangeDelAggregator* range_del_agg,
const std::vector<AtomicCompactionUnitBoundary>* compaction_boundaries =
nullptr,
bool allow_unprepared_value = false,
std::unique_ptr<TruncatedRangeDelIterator>*** range_tombstone_iter_ptr_ =
nullptr,
Statistics* db_statistics = nullptr, SystemClock* clock = nullptr)
: table_cache_(table_cache),
read_options_(read_options),
file_options_(file_options),
icomparator_(icomparator),
user_comparator_(icomparator.user_comparator()),
flevel_(flevel),
mutable_cf_options_(mutable_cf_options),
prefix_extractor_(mutable_cf_options.prefix_extractor.get()),
file_read_hist_(file_read_hist),
caller_(caller),
file_index_(flevel_->num_files),
range_del_agg_(range_del_agg),
pinned_iters_mgr_(nullptr),
compaction_boundaries_(compaction_boundaries),
range_tombstone_iter_(nullptr),
read_seq_(read_options.snapshot
? read_options.snapshot->GetSequenceNumber()
: kMaxSequenceNumber),
level_(level),
should_sample_(should_sample),
skip_filters_(skip_filters),
allow_unprepared_value_(allow_unprepared_value),
is_next_read_sequential_(false),
to_return_sentinel_(false),
scan_opts_(nullptr),
db_statistics_(db_statistics),
clock_(clock) {
assert(flevel_ != nullptr && flevel_->num_files > 0);
if (range_tombstone_iter_ptr_) {
*range_tombstone_iter_ptr_ = &range_tombstone_iter_;
}
}
~LevelIterator() override {
delete file_iter_.Set(nullptr);
for (auto& entry : prepared_iters_) {
delete entry.second;
}
prepared_iters_.clear();
assert(prepared_iters_.size() == 0);
}
void Seek(const Slice& target) override;
void SeekForPrev(const Slice& target) override;
void SeekToFirst() override;
void SeekToLast() override;
void Next() final override;
bool NextAndGetResult(IterateResult* result) override;
void Prev() override;
bool Valid() const override {
assert(!(file_iter_.Valid() && to_return_sentinel_));
return file_iter_.Valid() || to_return_sentinel_;
}
Slice key() const override {
assert(Valid());
if (to_return_sentinel_) {
assert(!file_iter_.Valid());
return sentinel_;
}
return file_iter_.key();
}
Slice value() const override {
assert(Valid());
assert(!to_return_sentinel_);
return file_iter_.value();
}
uint64_t write_unix_time() const override {
assert(Valid());
return file_iter_.write_unix_time();
}
Status status() const override {
return file_iter_.iter() ? file_iter_.status() : Status::OK();
}
bool PrepareValue() override { return file_iter_.PrepareValue(); }
inline bool MayBeOutOfLowerBound() override {
assert(Valid());
return may_be_out_of_lower_bound_ && file_iter_.MayBeOutOfLowerBound();
}
inline IterBoundCheck UpperBoundCheckResult() override {
if (Valid()) {
return file_iter_.UpperBoundCheckResult();
} else {
return IterBoundCheck::kUnknown;
}
}
void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override {
pinned_iters_mgr_ = pinned_iters_mgr;
if (file_iter_.iter()) {
file_iter_.SetPinnedItersMgr(pinned_iters_mgr);
}
}
bool IsKeyPinned() const override {
return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
file_iter_.iter() && file_iter_.IsKeyPinned();
}
bool IsValuePinned() const override {
return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
file_iter_.iter() && file_iter_.IsValuePinned();
}
bool IsDeleteRangeSentinelKey() const override { return to_return_sentinel_; }
void SetRangeDelReadSeqno(SequenceNumber read_seq) override {
read_seq_ = read_seq;
}
inline bool FileHasMultiScanArg(size_t file_index) {
if (file_to_scan_opts_.get()) {
auto it = file_to_scan_opts_->find(file_index);
if (it != file_to_scan_opts_->end()) {
return !it->second.empty();
}
}
return false;
}
MultiScanArgs& GetMultiScanArgForFile(size_t file_index) {
auto multi_scan_args_it = file_to_scan_opts_->find(file_index);
if (multi_scan_args_it == file_to_scan_opts_->end()) {
auto ret = file_to_scan_opts_->emplace(
file_index, MultiScanArgs(user_comparator_.user_comparator()));
multi_scan_args_it = ret.first;
assert(ret.second);
}
return multi_scan_args_it->second;
}
void Prepare(const MultiScanArgs* so) override {
if (so == nullptr) {
return;
}
scan_opts_ = so;
assert(so->GetComparator() == user_comparator_.user_comparator());
file_to_scan_opts_ = std::make_unique<ScanOptionsMap>();
for (size_t k = 0; k < scan_opts_->size(); k++) {
const ScanOptions& opt = scan_opts_->GetScanRanges().at(k);
auto start = opt.range.start;
auto end = opt.range.limit;
if (!start.has_value()) {
continue;
}
if (!end.has_value()) {
continue;
}
const size_t timestamp_size =
user_comparator_.user_comparator()->timestamp_size();
InternalKey istart, iend;
if (timestamp_size == 0) {
istart =
InternalKey(start.value(), kMaxSequenceNumber, kValueTypeForSeek);
iend = InternalKey(end.value(), kMaxSequenceNumber, kValueTypeForSeek);
} else {
std::string start_key_with_ts, end_key_with_ts;
AppendKeyWithMaxTimestamp(&start_key_with_ts, start.value(),
timestamp_size);
AppendKeyWithMaxTimestamp(&end_key_with_ts, end.value(),
timestamp_size);
istart = InternalKey(start_key_with_ts, kMaxSequenceNumber,
kValueTypeForSeek);
iend =
InternalKey(end_key_with_ts, kMaxSequenceNumber, kValueTypeForSeek);
}
size_t fstart = FindFile(icomparator_, *flevel_, istart.Encode());
size_t fend = FindFile(icomparator_, *flevel_, iend.Encode());
for (auto i = fstart; i <= fend; i++) {
if (i < flevel_->num_files) {
if (icomparator_.InternalKeyComparator::Compare(
iend.Encode(), flevel_->files[i].smallest_key) < 0) {
continue;
}
auto const metadata = flevel_->files[i].file_metadata;
if (metadata->FileIsStandAloneRangeTombstone()) {
continue;
}
auto& args = GetMultiScanArgForFile(i);
args.insert(start.value(), end.value(), opt.property_bag);
}
}
}
StopWatch timer(clock_, db_statistics_, MULTISCAN_PREPARE_ITERATORS);
for (auto& file_to_arg : *file_to_scan_opts_) {
file_to_arg.second.CopyConfigFrom(*so);
assert(OverlapRange(*file_to_arg.second.GetScanRanges().begin(),
file_to_arg.first) &&
OverlapRange(*file_to_arg.second.GetScanRanges().rbegin(),
file_to_arg.first));
}
if (so->use_async_io) {
auto before = file_index_;
for (auto& file_to_arg : *file_to_scan_opts_) {
size_t file_index = file_to_arg.first;
file_index_ = file_index;
auto iter = NewFileIterator();
if (iter != nullptr) {
iter->Prepare(&file_to_arg.second);
prepared_iters_[file_index] = iter;
}
}
file_index_ = before;
}
}
private:
bool SkipEmptyFileForward();
void SkipEmptyFileBackward();
void SetFileIterator(InternalIterator* iter);
void InitFileIterator(size_t new_file_index);
const Slice& file_smallest_key(size_t file_index) {
assert(file_index < flevel_->num_files);
return flevel_->files[file_index].smallest_key;
}
const Slice& file_largest_key(size_t file_index) {
assert(file_index < flevel_->num_files);
return flevel_->files[file_index].largest_key;
}
bool KeyReachedUpperBound(const Slice& internal_key) {
return read_options_.iterate_upper_bound != nullptr &&
user_comparator_.CompareWithoutTimestamp(
ExtractUserKey(internal_key), true,
*read_options_.iterate_upper_bound, false) >= 0;
}
void ClearRangeTombstoneIter() {
if (range_tombstone_iter_) {
range_tombstone_iter_->reset();
}
}
InternalIterator* NewFileIterator() {
assert(file_index_ < flevel_->num_files);
auto file_meta = flevel_->files[file_index_];
if (should_sample_) {
sample_file_read_inc(file_meta.file_metadata);
}
const InternalKey* smallest_compaction_key = nullptr;
const InternalKey* largest_compaction_key = nullptr;
if (compaction_boundaries_ != nullptr) {
smallest_compaction_key = (*compaction_boundaries_)[file_index_].smallest;
largest_compaction_key = (*compaction_boundaries_)[file_index_].largest;
}
CheckMayBeOutOfLowerBound();
ClearRangeTombstoneIter();
return table_cache_->NewIterator(
read_options_, file_options_, icomparator_, *file_meta.file_metadata,
range_del_agg_, mutable_cf_options_,
nullptr , file_read_hist_, caller_,
nullptr, skip_filters_, level_,
0, smallest_compaction_key,
largest_compaction_key, allow_unprepared_value_, &read_seq_,
range_tombstone_iter_);
}
void CheckMayBeOutOfLowerBound() {
if (read_options_.iterate_lower_bound != nullptr &&
file_index_ < flevel_->num_files) {
may_be_out_of_lower_bound_ =
user_comparator_.CompareWithoutTimestamp(
ExtractUserKey(file_smallest_key(file_index_)), true,
*read_options_.iterate_lower_bound, false) < 0;
}
}
#ifndef NDEBUG
bool OverlapRange(const ScanOptions& opts, size_t file_index);
#endif
TableCache* table_cache_;
const ReadOptions& read_options_;
const FileOptions& file_options_;
const InternalKeyComparator& icomparator_;
const UserComparatorWrapper user_comparator_;
const LevelFilesBrief* flevel_;
mutable FileDescriptor current_value_;
const MutableCFOptions& mutable_cf_options_;
const SliceTransform* prefix_extractor_;
HistogramImpl* file_read_hist_;
TableReaderCaller caller_;
size_t file_index_;
RangeDelAggregator* range_del_agg_;
IteratorWrapper file_iter_; PinnedIteratorsManager* pinned_iters_mgr_;
const std::vector<AtomicCompactionUnitBoundary>* compaction_boundaries_;
std::unique_ptr<TruncatedRangeDelIterator>* range_tombstone_iter_;
Slice sentinel_;
SequenceNumber read_seq_;
int level_;
bool should_sample_;
bool skip_filters_;
bool allow_unprepared_value_;
bool may_be_out_of_lower_bound_ = true;
bool is_next_read_sequential_;
bool prefix_exhausted_ = false;
bool to_return_sentinel_ = false;
const MultiScanArgs* scan_opts_ = nullptr;
Statistics* db_statistics_ = nullptr;
SystemClock* clock_ = nullptr;
std::unique_ptr<ScanOptionsMap> file_to_scan_opts_ = nullptr;
std::unordered_map<size_t, InternalIterator*> prepared_iters_;
void TrySetDeleteRangeSentinel(const Slice& boundary_key);
void ClearSentinel() { to_return_sentinel_ = false; }
};
void LevelIterator::TrySetDeleteRangeSentinel(const Slice& boundary_key) {
assert(range_tombstone_iter_);
if (file_iter_.iter() != nullptr && !file_iter_.Valid() &&
file_iter_.status().ok()) {
to_return_sentinel_ = true;
sentinel_ = boundary_key;
}
}
void LevelIterator::Seek(const Slice& target) {
prefix_exhausted_ = false;
ClearSentinel();
bool need_to_reseek = true;
if (file_iter_.iter() != nullptr && file_index_ < flevel_->num_files) {
const FdWithKeyRange& cur_file = flevel_->files[file_index_];
if (icomparator_.InternalKeyComparator::Compare(
target, cur_file.largest_key) <= 0 &&
icomparator_.InternalKeyComparator::Compare(
target, cur_file.smallest_key) >= 0) {
need_to_reseek = false;
assert(static_cast<size_t>(FindFile(icomparator_, *flevel_, target)) ==
file_index_);
}
}
if (need_to_reseek) {
TEST_SYNC_POINT("LevelIterator::Seek:BeforeFindFile");
size_t new_file_index = FindFile(icomparator_, *flevel_, target);
InitFileIterator(new_file_index);
}
if (file_iter_.iter() != nullptr) {
if (scan_opts_) {
const FdWithKeyRange& cur_file = flevel_->files[file_index_];
if (KeyReachedUpperBound(cur_file.smallest_key)) {
return;
}
}
file_iter_.Seek(target);
if (file_iter_.status() == Status::TryAgain()) {
return;
}
if (!file_iter_.Valid() && file_iter_.status().ok() &&
prefix_extractor_ != nullptr && !read_options_.total_order_seek &&
!read_options_.auto_prefix_mode &&
file_index_ < flevel_->num_files - 1) {
size_t ts_sz = user_comparator_.user_comparator()->timestamp_size();
Slice target_user_key_without_ts =
ExtractUserKeyAndStripTimestamp(target, ts_sz);
Slice next_file_first_user_key_without_ts =
ExtractUserKeyAndStripTimestamp(file_smallest_key(file_index_ + 1),
ts_sz);
if (prefix_extractor_->InDomain(target_user_key_without_ts) &&
(!prefix_extractor_->InDomain(next_file_first_user_key_without_ts) ||
prefix_extractor_->Transform(target_user_key_without_ts)
.compare(prefix_extractor_->Transform(
next_file_first_user_key_without_ts)) != 0)) {
prefix_exhausted_ = true;
}
}
if (range_tombstone_iter_) {
TrySetDeleteRangeSentinel(file_largest_key(file_index_));
}
}
SkipEmptyFileForward();
CheckMayBeOutOfLowerBound();
}
void LevelIterator::SeekForPrev(const Slice& target) {
prefix_exhausted_ = false;
ClearSentinel();
size_t new_file_index = FindFile(icomparator_, *flevel_, target);
if (new_file_index == 0 &&
icomparator_.Compare(target, file_smallest_key(0)) < 0) {
SetFileIterator(nullptr);
ClearRangeTombstoneIter();
CheckMayBeOutOfLowerBound();
return;
}
if (new_file_index >= flevel_->num_files) {
new_file_index = flevel_->num_files - 1;
}
InitFileIterator(new_file_index);
if (file_iter_.iter() != nullptr) {
file_iter_.SeekForPrev(target);
if (range_tombstone_iter_ &&
icomparator_.Compare(target, file_smallest_key(file_index_)) >= 0) {
TrySetDeleteRangeSentinel(file_smallest_key(file_index_));
}
SkipEmptyFileBackward();
}
CheckMayBeOutOfLowerBound();
}
void LevelIterator::SeekToFirst() {
prefix_exhausted_ = false;
ClearSentinel();
InitFileIterator(0);
if (file_iter_.iter() != nullptr) {
file_iter_.SeekToFirst();
if (range_tombstone_iter_) {
TrySetDeleteRangeSentinel(file_largest_key(file_index_));
}
}
SkipEmptyFileForward();
CheckMayBeOutOfLowerBound();
}
void LevelIterator::SeekToLast() {
prefix_exhausted_ = false;
ClearSentinel();
InitFileIterator(flevel_->num_files - 1);
if (file_iter_.iter() != nullptr) {
file_iter_.SeekToLast();
if (range_tombstone_iter_) {
TrySetDeleteRangeSentinel(file_smallest_key(file_index_));
}
}
SkipEmptyFileBackward();
CheckMayBeOutOfLowerBound();
}
void LevelIterator::Next() {
assert(Valid());
if (to_return_sentinel_) {
ClearSentinel();
} else {
file_iter_.Next();
if (range_tombstone_iter_) {
TrySetDeleteRangeSentinel(file_largest_key(file_index_));
}
}
SkipEmptyFileForward();
}
bool LevelIterator::NextAndGetResult(IterateResult* result) {
assert(Valid());
bool is_valid = !to_return_sentinel_ && file_iter_.NextAndGetResult(result);
if (!is_valid) {
if (to_return_sentinel_) {
ClearSentinel();
} else if (range_tombstone_iter_) {
TrySetDeleteRangeSentinel(file_largest_key(file_index_));
}
is_next_read_sequential_ = true;
SkipEmptyFileForward();
is_next_read_sequential_ = false;
is_valid = Valid();
if (is_valid) {
if (to_return_sentinel_) {
result->key = sentinel_;
result->bound_check_result = IterBoundCheck::kUnknown;
result->value_prepared = true;
} else {
result->key = key();
result->bound_check_result = file_iter_.UpperBoundCheckResult();
result->value_prepared = !allow_unprepared_value_;
}
}
}
return is_valid;
}
void LevelIterator::Prev() {
assert(Valid());
if (to_return_sentinel_) {
ClearSentinel();
} else {
file_iter_.Prev();
if (range_tombstone_iter_) {
TrySetDeleteRangeSentinel(file_smallest_key(file_index_));
}
}
SkipEmptyFileBackward();
}
bool LevelIterator::SkipEmptyFileForward() {
bool seen_empty_file = false;
while (!to_return_sentinel_ &&
(file_iter_.iter() == nullptr ||
(!file_iter_.Valid() && file_iter_.status().ok() &&
file_iter_.iter()->UpperBoundCheckResult() !=
IterBoundCheck::kOutOfBound))) {
seen_empty_file = true;
if (file_index_ >= flevel_->num_files - 1 ||
KeyReachedUpperBound(file_smallest_key(file_index_ + 1)) ||
prefix_exhausted_) {
SetFileIterator(nullptr);
ClearRangeTombstoneIter();
break;
}
InitFileIterator(file_index_ + 1);
if (file_iter_.iter() != nullptr) {
if (scan_opts_ && FileHasMultiScanArg(file_index_)) {
const ScanOptions& opts =
GetMultiScanArgForFile(file_index_).GetScanRanges().front();
if (opts.range.start.has_value()) {
InternalKey target;
const size_t ts_size =
user_comparator_.user_comparator()->timestamp_size();
if (ts_size == 0) {
target = InternalKey(opts.range.start.value(), kMaxSequenceNumber,
kValueTypeForSeek);
} else {
std::string seek_key;
AppendKeyWithMaxTimestamp(&seek_key, opts.range.start.value(),
ts_size);
target =
InternalKey(seek_key, kMaxSequenceNumber, kValueTypeForSeek);
}
file_iter_.Seek(target.Encode());
}
} else {
file_iter_.SeekToFirst();
}
if (range_tombstone_iter_) {
if (*range_tombstone_iter_) {
(*range_tombstone_iter_)->SeekToFirst();
}
TrySetDeleteRangeSentinel(file_largest_key(file_index_));
}
}
}
return seen_empty_file;
}
void LevelIterator::SkipEmptyFileBackward() {
while (!to_return_sentinel_ &&
(file_iter_.iter() == nullptr ||
(!file_iter_.Valid() && file_iter_.status().ok()))) {
if (file_index_ == 0) {
SetFileIterator(nullptr);
ClearRangeTombstoneIter();
return;
}
InitFileIterator(file_index_ - 1);
if (file_iter_.iter() != nullptr) {
file_iter_.SeekToLast();
if (range_tombstone_iter_) {
if (*range_tombstone_iter_) {
(*range_tombstone_iter_)->SeekToLast();
}
TrySetDeleteRangeSentinel(file_smallest_key(file_index_));
if (to_return_sentinel_) {
break;
}
}
}
}
}
#ifndef NDEBUG
bool LevelIterator::OverlapRange(const ScanOptions& opts, size_t file_index) {
return (user_comparator_.CompareWithoutTimestamp(
opts.range.start.value(), false,
ExtractUserKey(flevel_->files[file_index].largest_key),
true) <= 0 &&
user_comparator_.CompareWithoutTimestamp(
opts.range.limit.value(), false,
ExtractUserKey(flevel_->files[file_index].smallest_key),
true) > 0);
}
#endif
void LevelIterator::SetFileIterator(InternalIterator* iter) {
if (pinned_iters_mgr_ && iter) {
iter->SetPinnedItersMgr(pinned_iters_mgr_);
}
InternalIterator* old_iter = file_iter_.Set(iter);
if (is_next_read_sequential_) {
file_iter_.UpdateReadaheadState(old_iter);
}
if (pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled()) {
pinned_iters_mgr_->PinIterator(old_iter);
} else {
delete old_iter;
}
}
void LevelIterator::InitFileIterator(size_t new_file_index) {
if (new_file_index >= flevel_->num_files) {
file_index_ = new_file_index;
SetFileIterator(nullptr);
ClearRangeTombstoneIter();
return;
} else {
if (file_iter_.iter() != nullptr && !file_iter_.status().IsIncomplete() &&
new_file_index == file_index_) {
} else {
file_index_ = new_file_index;
if (!prepared_iters_.empty()) {
auto prepared_it = prepared_iters_.find(file_index_);
if (prepared_it != prepared_iters_.end()) {
InternalIterator* iter = prepared_it->second;
prepared_iters_.erase(prepared_it);
SetFileIterator(iter);
return;
}
}
InternalIterator* iter = NewFileIterator();
if (FileHasMultiScanArg(file_index_)) {
auto& args = GetMultiScanArgForFile(file_index_);
assert(OverlapRange(*args.GetScanRanges().begin(), file_index_) &&
OverlapRange(*args.GetScanRanges().rbegin(), file_index_));
iter->Prepare(&args);
}
SetFileIterator(iter);
}
}
}
}
Status Version::GetTableProperties(const ReadOptions& read_options,
std::shared_ptr<const TableProperties>* tp,
const FileMetaData* file_meta,
const std::string* fname) const {
auto* table_cache = cfd_->table_cache();
const auto& ioptions = cfd_->ioptions();
Status s = table_cache->GetTableProperties(
file_options_, read_options, cfd_->internal_comparator(), *file_meta, tp,
mutable_cf_options_, true );
if (s.ok()) {
return s;
}
if (!s.IsIncomplete()) {
return s;
}
std::unique_ptr<FSRandomAccessFile> file;
std::string file_name;
if (fname != nullptr) {
file_name = *fname;
} else {
file_name = TableFileName(ioptions.cf_paths, file_meta->fd.GetNumber(),
file_meta->fd.GetPathId());
}
FileOptions fopts = file_options_;
fopts.file_checksum = file_meta->file_checksum;
fopts.file_checksum_func_name = file_meta->file_checksum_func_name;
s = ioptions.fs->NewRandomAccessFile(file_name, fopts, &file, nullptr);
if (!s.ok()) {
return s;
}
std::unique_ptr<RandomAccessFileReader> file_reader(
new RandomAccessFileReader(
std::move(file), file_name, ioptions.clock , io_tracer_,
ioptions.stats ,
Histograms::SST_READ_MICROS ,
nullptr , nullptr ,
ioptions.listeners));
std::unique_ptr<TableProperties> props;
s = ReadTableProperties(
file_reader.get(), file_meta->fd.GetFileSize(),
Footer::kNullTableMagicNumber , ioptions,
read_options, &props);
if (!s.ok()) {
return s;
}
*tp = std::move(props);
RecordTick(ioptions.stats, NUMBER_DIRECT_LOAD_TABLE_PROPERTIES);
return s;
}
Status Version::GetPropertiesOfAllTables(
const ReadOptions& read_options, TablePropertiesCollection* props) const {
Status s;
for (int level = 0; level < storage_info_.num_levels_; level++) {
s = GetPropertiesOfAllTables(read_options, props, level);
if (!s.ok()) {
return s;
}
}
return Status::OK();
}
Status Version::TablesRangeTombstoneSummary(int max_entries_to_print,
std::string* out_str) {
if (max_entries_to_print <= 0) {
return Status::OK();
}
int num_entries_left = max_entries_to_print;
std::stringstream ss;
const ReadOptions read_options;
for (int level = 0; level < storage_info_.num_levels_; level++) {
for (const auto& file_meta : storage_info_.files_[level]) {
auto fname =
TableFileName(cfd_->ioptions().cf_paths, file_meta->fd.GetNumber(),
file_meta->fd.GetPathId());
ss << "=== file : " << fname << " ===\n";
TableCache* table_cache = cfd_->table_cache();
std::unique_ptr<FragmentedRangeTombstoneIterator> tombstone_iter;
Status s = table_cache->GetRangeTombstoneIterator(
read_options, cfd_->internal_comparator(), *file_meta,
mutable_cf_options_, &tombstone_iter);
if (!s.ok()) {
return s;
}
if (tombstone_iter) {
tombstone_iter->SeekToFirst();
while (tombstone_iter->Valid() && num_entries_left > 0) {
ss << "start: " << tombstone_iter->start_key().ToString(true)
<< " end: " << tombstone_iter->end_key().ToString(true)
<< " seq: " << tombstone_iter->seq() << '\n';
tombstone_iter->Next();
num_entries_left--;
}
if (num_entries_left <= 0) {
break;
}
}
}
if (num_entries_left <= 0) {
break;
}
}
assert(num_entries_left >= 0);
if (num_entries_left <= 0) {
ss << "(results may not be complete)\n";
}
*out_str = ss.str();
return Status::OK();
}
Status Version::GetPropertiesOfAllTables(const ReadOptions& read_options,
TablePropertiesCollection* props,
int level) const {
for (const auto& file_meta : storage_info_.files_[level]) {
auto fname =
TableFileName(cfd_->ioptions().cf_paths, file_meta->fd.GetNumber(),
file_meta->fd.GetPathId());
std::shared_ptr<const TableProperties> table_properties;
Status s =
GetTableProperties(read_options, &table_properties, file_meta, &fname);
if (s.ok()) {
props->insert({fname, table_properties});
} else {
return s;
}
}
return Status::OK();
}
Status Version::GetPropertiesOfTablesInRange(
const ReadOptions& read_options, const autovector<UserKeyRange>& ranges,
TablePropertiesCollection* props) const {
for (int level = 0; level < storage_info_.num_non_empty_levels(); level++) {
for (const auto& range : ranges) {
InternalKey k1(range.start, kMaxSequenceNumber, kValueTypeForSeek);
InternalKey k2(range.limit, kMaxSequenceNumber, kValueTypeForSeek);
std::vector<FileMetaData*> files;
storage_info_.GetOverlappingInputs(level, &k1, &k2, &files, -1, nullptr,
false);
for (const auto& file_meta : files) {
auto fname =
TableFileName(cfd_->ioptions().cf_paths, file_meta->fd.GetNumber(),
file_meta->fd.GetPathId());
if (props->count(fname) == 0) {
std::shared_ptr<const TableProperties> table_properties;
Status s = GetTableProperties(read_options, &table_properties,
file_meta, &fname);
if (s.ok()) {
props->insert({fname, table_properties});
} else {
return s;
}
}
}
}
}
return Status::OK();
}
Status Version::GetPropertiesOfTablesByLevel(
const ReadOptions& read_options,
std::vector<std::unique_ptr<TablePropertiesCollection>>* props_by_level)
const {
Status s;
props_by_level->reserve(storage_info_.num_levels_);
for (int level = 0; level < storage_info_.num_levels_; level++) {
props_by_level->push_back(std::make_unique<TablePropertiesCollection>());
s = GetPropertiesOfAllTables(read_options, props_by_level->back().get(),
level);
if (!s.ok()) {
return s;
}
}
return Status::OK();
}
Status Version::GetAggregatedTableProperties(
const ReadOptions& read_options, std::shared_ptr<const TableProperties>* tp,
int level) {
TablePropertiesCollection props;
Status s;
if (level < 0) {
s = GetPropertiesOfAllTables(read_options, &props);
} else {
s = GetPropertiesOfAllTables(read_options, &props, level);
}
if (!s.ok()) {
return s;
}
auto* new_tp = new TableProperties();
for (const auto& item : props) {
new_tp->Add(*item.second);
}
tp->reset(new_tp);
return Status::OK();
}
size_t Version::GetMemoryUsageByTableReaders(const ReadOptions& read_options) {
size_t total_usage = 0;
for (auto& file_level : storage_info_.level_files_brief_) {
for (size_t i = 0; i < file_level.num_files; i++) {
total_usage += cfd_->table_cache()->GetMemoryUsageByTableReader(
file_options_, read_options, cfd_->internal_comparator(),
*file_level.files[i].file_metadata, mutable_cf_options_);
}
}
return total_usage;
}
void Version::GetColumnFamilyMetaData(ColumnFamilyMetaData* cf_meta) {
assert(cf_meta);
assert(cfd_);
cf_meta->name = cfd_->GetName();
cf_meta->size = 0;
cf_meta->file_count = 0;
cf_meta->levels.clear();
cf_meta->blob_file_size = 0;
cf_meta->blob_file_count = 0;
cf_meta->blob_files.clear();
const auto& ioptions = cfd_->ioptions();
auto* vstorage = storage_info();
for (int level = 0; level < cfd_->NumberLevels(); level++) {
uint64_t level_size = 0;
cf_meta->file_count += vstorage->LevelFiles(level).size();
std::vector<SstFileMetaData> files;
for (const auto& file : vstorage->LevelFiles(level)) {
uint32_t path_id = file->fd.GetPathId();
std::string file_path;
if (path_id < ioptions.cf_paths.size()) {
file_path = ioptions.cf_paths[path_id].path;
} else {
assert(!ioptions.cf_paths.empty());
file_path = ioptions.cf_paths.back().path;
}
const uint64_t file_number = file->fd.GetNumber();
files.emplace_back(
MakeTableFileName("", file_number), file_number, file_path,
file->fd.GetFileSize(), file->fd.smallest_seqno,
file->fd.largest_seqno, file->smallest.user_key().ToString(),
file->largest.user_key().ToString(),
file->stats.num_reads_sampled.load(std::memory_order_relaxed),
file->being_compacted, file->temperature,
file->oldest_blob_file_number, file->TryGetOldestAncesterTime(),
file->TryGetFileCreationTime(), file->epoch_number,
file->file_checksum, file->file_checksum_func_name);
files.back().num_entries = file->num_entries;
files.back().num_deletions = file->num_deletions;
files.back().smallest = file->smallest.Encode().ToString();
files.back().largest = file->largest.Encode().ToString();
level_size += file->fd.GetFileSize();
}
cf_meta->levels.emplace_back(level, level_size, std::move(files));
cf_meta->size += level_size;
}
for (const auto& meta : vstorage->GetBlobFiles()) {
assert(meta);
cf_meta->blob_files.emplace_back(
meta->GetBlobFileNumber(), BlobFileName("", meta->GetBlobFileNumber()),
ioptions.cf_paths.front().path, meta->GetBlobFileSize(),
meta->GetTotalBlobCount(), meta->GetTotalBlobBytes(),
meta->GetGarbageBlobCount(), meta->GetGarbageBlobBytes(),
meta->GetChecksumMethod(), meta->GetChecksumValue());
++cf_meta->blob_file_count;
cf_meta->blob_file_size += meta->GetBlobFileSize();
}
}
void Version::GetColumnFamilyMetaData(
const GetColumnFamilyMetaDataOptions& options,
ColumnFamilyMetaData* cf_meta) {
assert(cf_meta);
assert(cfd_);
cf_meta->name = cfd_->GetName();
cf_meta->size = 0;
cf_meta->file_count = 0;
cf_meta->levels.clear();
cf_meta->blob_file_size = 0;
cf_meta->blob_file_count = 0;
cf_meta->blob_files.clear();
const auto& ioptions = cfd_->ioptions();
auto* vstorage = storage_info();
int first_level = (options.level >= 0) ? options.level : 0;
int last_level =
(options.level >= 0) ? options.level + 1 : cfd_->NumberLevels();
InternalKey ikey_start, ikey_end;
const InternalKey* begin = nullptr;
const InternalKey* end = nullptr;
if (options.range.start.has_value()) {
ikey_start = InternalKey(options.range.start.value(), kMaxSequenceNumber,
kValueTypeForSeek);
begin = &ikey_start;
}
if (options.range.limit.has_value()) {
ikey_end = InternalKey(options.range.limit.value(), kMaxSequenceNumber,
kValueTypeForSeek);
end = &ikey_end;
}
for (int l = first_level; l < last_level; ++l) {
uint64_t level_size = 0;
std::vector<SstFileMetaData> files;
std::vector<FileMetaData*> overlapping_files;
vstorage->GetOverlappingInputs(l, begin, end, &overlapping_files);
for (const auto& file : overlapping_files) {
uint32_t path_id = file->fd.GetPathId();
const auto& file_path = (path_id < ioptions.cf_paths.size())
? ioptions.cf_paths[path_id].path
: ioptions.cf_paths.back().path;
const uint64_t file_number = file->fd.GetNumber();
files.emplace_back(
MakeTableFileName("", file_number), file_number, file_path,
file->fd.GetFileSize(), file->fd.smallest_seqno,
file->fd.largest_seqno, file->smallest.user_key().ToString(),
file->largest.user_key().ToString(),
file->stats.num_reads_sampled.load(std::memory_order_relaxed),
file->being_compacted, file->temperature,
file->oldest_blob_file_number, file->TryGetOldestAncesterTime(),
file->TryGetFileCreationTime(), file->epoch_number,
file->file_checksum, file->file_checksum_func_name);
files.back().num_entries = file->num_entries;
files.back().num_deletions = file->num_deletions;
files.back().smallest = file->smallest.Encode().ToString();
files.back().largest = file->largest.Encode().ToString();
level_size += file->fd.GetFileSize();
cf_meta->file_count++;
}
if (!files.empty()) {
cf_meta->levels.emplace_back(l, level_size, std::move(files));
cf_meta->size += level_size;
}
}
}
uint64_t Version::GetSstFilesSize() {
uint64_t sst_files_size = 0;
for (int level = 0; level < storage_info_.num_levels_; level++) {
for (const auto& file_meta : storage_info_.LevelFiles(level)) {
sst_files_size += file_meta->fd.GetFileSize();
}
}
return sst_files_size;
}
void Version::GetSstFilesBoundaryKeys(Slice* smallest_user_key,
Slice* largest_user_key) {
smallest_user_key->clear();
largest_user_key->clear();
bool initialized = false;
const Comparator* ucmp = storage_info_.user_comparator_;
for (int level = 0; level < cfd_->NumberLevels(); level++) {
if (storage_info_.LevelFiles(level).size() == 0) {
continue;
}
if (level == 0) {
for (const auto& file : storage_info_.LevelFiles(level)) {
const Slice& start_user_key = file->smallest.user_key();
if (!initialized ||
ucmp->Compare(start_user_key, *smallest_user_key) < 0) {
*smallest_user_key = start_user_key;
}
const Slice& end_user_key = file->largest.user_key();
if (!initialized ||
ucmp->Compare(end_user_key, *largest_user_key) > 0) {
*largest_user_key = end_user_key;
}
initialized = true;
}
} else {
const Slice& start_user_key =
storage_info_.LevelFiles(level)[0]->smallest.user_key();
if (!initialized ||
ucmp->Compare(start_user_key, *smallest_user_key) < 0) {
*smallest_user_key = start_user_key;
}
const Slice& end_user_key =
storage_info_.LevelFiles(level).back()->largest.user_key();
if (!initialized || ucmp->Compare(end_user_key, *largest_user_key) > 0) {
*largest_user_key = end_user_key;
}
initialized = true;
}
}
}
void Version::GetCreationTimeOfOldestFile(uint64_t* creation_time) {
uint64_t oldest_time = std::numeric_limits<uint64_t>::max();
for (int level = 0; level < storage_info_.num_non_empty_levels_; level++) {
for (FileMetaData* meta : storage_info_.LevelFiles(level)) {
assert(meta->fd.table_reader != nullptr);
uint64_t file_creation_time = meta->TryGetFileCreationTime();
if (file_creation_time == kUnknownFileCreationTime) {
*creation_time = 0;
return;
}
if (file_creation_time < oldest_time) {
oldest_time = file_creation_time;
}
}
}
*creation_time = oldest_time;
}
InternalIterator* Version::TEST_GetLevelIterator(
const ReadOptions& read_options, MergeIteratorBuilder* merge_iter_builder,
int level, bool allow_unprepared_value) {
auto* arena = merge_iter_builder->GetArena();
auto* mem = arena->AllocateAligned(sizeof(LevelIterator));
std::unique_ptr<TruncatedRangeDelIterator>** tombstone_iter_ptr = nullptr;
auto level_iter = new (mem) LevelIterator(
cfd_->table_cache(), read_options, file_options_,
cfd_->internal_comparator(), &storage_info_.LevelFilesBrief(level),
mutable_cf_options_, should_sample_file_read(),
cfd_->internal_stats()->GetFileReadHist(level),
TableReaderCaller::kUserIterator, IsFilterSkipped(level), level,
nullptr , nullptr ,
allow_unprepared_value, &tombstone_iter_ptr, db_statistics_, clock_);
if (read_options.ignore_range_deletions) {
merge_iter_builder->AddIterator(level_iter);
} else {
merge_iter_builder->AddPointAndTombstoneIterator(
level_iter, nullptr , tombstone_iter_ptr);
}
return level_iter;
}
uint64_t VersionStorageInfo::GetEstimatedActiveKeys() const {
if (current_num_samples_ == 0) {
return 0;
}
if (current_num_non_deletions_ <= current_num_deletions_) {
return 0;
}
uint64_t est = current_num_non_deletions_ - current_num_deletions_;
uint64_t file_count = 0;
for (int level = 0; level < num_levels_; ++level) {
file_count += files_[level].size();
}
if (current_num_samples_ < file_count) {
assert(current_num_samples_ != 0);
assert(est != 0);
double multiplier = static_cast<double>(file_count) / current_num_samples_;
double maximum_multiplier =
static_cast<double>(std::numeric_limits<uint64_t>::max()) / est;
if (multiplier >= maximum_multiplier) {
return std::numeric_limits<uint64_t>::max();
}
return static_cast<uint64_t>(est * multiplier);
} else {
return est;
}
}
double VersionStorageInfo::GetEstimatedCompressionRatioAtLevel(
int level) const {
assert(level < num_levels_);
uint64_t sum_file_size_bytes = 0;
uint64_t sum_data_size_bytes = 0;
for (auto* file_meta : files_[level]) {
auto raw_size = file_meta->raw_key_size + file_meta->raw_value_size;
if (raw_size > 0) {
sum_file_size_bytes += file_meta->fd.GetFileSize();
sum_data_size_bytes += raw_size;
}
}
if (sum_file_size_bytes == 0) {
return -1.0;
}
return static_cast<double>(sum_data_size_bytes) / sum_file_size_bytes;
}
void Version::AddIterators(const ReadOptions& read_options,
const FileOptions& soptions,
MergeIteratorBuilder* merge_iter_builder,
bool allow_unprepared_value) {
assert(storage_info_.finalized_);
for (int level = 0; level < storage_info_.num_non_empty_levels(); level++) {
AddIteratorsForLevel(read_options, soptions, merge_iter_builder, level,
allow_unprepared_value);
}
}
void Version::AddIteratorsForLevel(const ReadOptions& read_options,
const FileOptions& soptions,
MergeIteratorBuilder* merge_iter_builder,
int level, bool allow_unprepared_value) {
assert(storage_info_.finalized_);
if (level >= storage_info_.num_non_empty_levels()) {
return;
} else if (storage_info_.LevelFilesBrief(level).num_files == 0) {
return;
}
bool should_sample = should_sample_file_read();
auto* arena = merge_iter_builder->GetArena();
if (level == 0) {
std::unique_ptr<TruncatedRangeDelIterator> tombstone_iter = nullptr;
for (size_t i = 0; i < storage_info_.LevelFilesBrief(0).num_files; i++) {
const auto& file = storage_info_.LevelFilesBrief(0).files[i];
auto table_iter = cfd_->table_cache()->NewIterator(
read_options, soptions, cfd_->internal_comparator(),
*file.file_metadata, nullptr, mutable_cf_options_,
nullptr, cfd_->internal_stats()->GetFileReadHist(0),
TableReaderCaller::kUserIterator, arena,
false, 0, max_file_size_for_l0_meta_pin_,
nullptr,
nullptr, allow_unprepared_value,
nullptr, &tombstone_iter);
if (read_options.ignore_range_deletions) {
merge_iter_builder->AddIterator(table_iter);
} else {
merge_iter_builder->AddPointAndTombstoneIterator(
table_iter, std::move(tombstone_iter));
}
}
if (should_sample) {
for (FileMetaData* meta : storage_info_.LevelFiles(0)) {
sample_file_read_inc(meta);
}
}
} else if (storage_info_.LevelFilesBrief(level).num_files > 0) {
auto* mem = arena->AllocateAligned(sizeof(LevelIterator));
std::unique_ptr<TruncatedRangeDelIterator>** tombstone_iter_ptr = nullptr;
auto level_iter = new (mem) LevelIterator(
cfd_->table_cache(), read_options, soptions,
cfd_->internal_comparator(), &storage_info_.LevelFilesBrief(level),
mutable_cf_options_, should_sample_file_read(),
cfd_->internal_stats()->GetFileReadHist(level),
TableReaderCaller::kUserIterator, IsFilterSkipped(level), level,
nullptr,
nullptr, allow_unprepared_value,
&tombstone_iter_ptr, db_statistics_, clock_);
if (read_options.ignore_range_deletions) {
merge_iter_builder->AddIterator(level_iter);
} else {
merge_iter_builder->AddPointAndTombstoneIterator(
level_iter, nullptr , tombstone_iter_ptr);
}
}
}
Status Version::OverlapWithLevelIterator(const ReadOptions& read_options,
const FileOptions& file_options,
const Slice& smallest_user_key,
const Slice& largest_user_key,
int level, bool* overlap) {
assert(storage_info_.finalized_);
auto icmp = cfd_->internal_comparator();
auto ucmp = icmp.user_comparator();
Arena arena;
Status status;
ReadRangeDelAggregator range_del_agg(&icmp,
kMaxSequenceNumber );
*overlap = false;
if (level == 0) {
for (size_t i = 0; i < storage_info_.LevelFilesBrief(0).num_files; i++) {
const auto file = &storage_info_.LevelFilesBrief(0).files[i];
if (AfterFile(ucmp, &smallest_user_key, file) ||
BeforeFile(ucmp, &largest_user_key, file)) {
continue;
}
ScopedArenaPtr<InternalIterator> iter(cfd_->table_cache()->NewIterator(
read_options, file_options, cfd_->internal_comparator(),
*file->file_metadata, &range_del_agg, mutable_cf_options_, nullptr,
cfd_->internal_stats()->GetFileReadHist(0),
TableReaderCaller::kUserIterator, &arena,
false, 0, max_file_size_for_l0_meta_pin_,
nullptr,
nullptr,
false));
status = OverlapWithIterator(ucmp, smallest_user_key, largest_user_key,
iter.get(), overlap);
if (!status.ok() || *overlap) {
break;
}
}
} else if (storage_info_.LevelFilesBrief(level).num_files > 0) {
auto mem = arena.AllocateAligned(sizeof(LevelIterator));
ScopedArenaPtr<InternalIterator> iter(new (mem) LevelIterator(
cfd_->table_cache(), read_options, file_options,
cfd_->internal_comparator(), &storage_info_.LevelFilesBrief(level),
mutable_cf_options_, should_sample_file_read(),
cfd_->internal_stats()->GetFileReadHist(level),
TableReaderCaller::kUserIterator, IsFilterSkipped(level), level,
&range_del_agg, nullptr, false, nullptr, db_statistics_, clock_));
status = OverlapWithIterator(ucmp, smallest_user_key, largest_user_key,
iter.get(), overlap);
}
if (status.ok() && *overlap == false &&
range_del_agg.IsRangeOverlapped(smallest_user_key, largest_user_key)) {
*overlap = true;
}
return status;
}
VersionStorageInfo::VersionStorageInfo(
const InternalKeyComparator* internal_comparator,
const Comparator* user_comparator, int levels,
CompactionStyle compaction_style, VersionStorageInfo* ref_vstorage,
bool _force_consistency_checks,
EpochNumberRequirement epoch_number_requirement, SystemClock* clock,
uint32_t bottommost_file_compaction_delay,
OffpeakTimeOption offpeak_time_option)
: internal_comparator_(internal_comparator),
user_comparator_(user_comparator),
num_levels_(levels),
num_non_empty_levels_(0),
file_indexer_(user_comparator),
compaction_style_(compaction_style),
files_(new std::vector<FileMetaData*>[num_levels_]),
base_level_(num_levels_ == 1 ? -1 : 1),
lowest_unnecessary_level_(-1),
level_multiplier_(0.0),
files_by_compaction_pri_(num_levels_),
level0_non_overlapping_(false),
next_file_to_compact_by_size_(num_levels_),
compaction_score_(num_levels_),
compaction_level_(num_levels_),
l0_delay_trigger_count_(0),
compact_cursor_(num_levels_),
accumulated_file_size_(0),
accumulated_raw_key_size_(0),
accumulated_raw_value_size_(0),
accumulated_num_non_deletions_(0),
accumulated_num_deletions_(0),
current_num_non_deletions_(0),
current_num_deletions_(0),
current_num_samples_(0),
estimated_compaction_needed_bytes_(0),
clock_(clock),
bottommost_file_compaction_delay_(bottommost_file_compaction_delay),
finalized_(false),
force_consistency_checks_(_force_consistency_checks),
epoch_number_requirement_(epoch_number_requirement),
offpeak_time_option_(std::move(offpeak_time_option)) {
if (ref_vstorage != nullptr) {
accumulated_file_size_ = ref_vstorage->accumulated_file_size_;
accumulated_raw_key_size_ = ref_vstorage->accumulated_raw_key_size_;
accumulated_raw_value_size_ = ref_vstorage->accumulated_raw_value_size_;
accumulated_num_non_deletions_ =
ref_vstorage->accumulated_num_non_deletions_;
accumulated_num_deletions_ = ref_vstorage->accumulated_num_deletions_;
current_num_non_deletions_ = ref_vstorage->current_num_non_deletions_;
current_num_deletions_ = ref_vstorage->current_num_deletions_;
current_num_samples_ = ref_vstorage->current_num_samples_;
oldest_snapshot_seqnum_ = ref_vstorage->oldest_snapshot_seqnum_;
compact_cursor_ = ref_vstorage->compact_cursor_;
compact_cursor_.resize(num_levels_);
}
}
Version::Version(ColumnFamilyData* column_family_data, VersionSet* vset,
const FileOptions& file_opt,
const MutableCFOptions& mutable_cf_options,
const std::shared_ptr<IOTracer>& io_tracer,
uint64_t version_number,
EpochNumberRequirement epoch_number_requirement)
: env_(vset->env_),
clock_(vset->clock_),
cfd_(column_family_data),
info_log_((cfd_ == nullptr) ? nullptr : cfd_->ioptions().logger),
db_statistics_((cfd_ == nullptr) ? nullptr : cfd_->ioptions().stats),
table_cache_((cfd_ == nullptr) ? nullptr : cfd_->table_cache()),
blob_source_(cfd_ ? cfd_->blob_source() : nullptr),
merge_operator_(
(cfd_ == nullptr) ? nullptr : cfd_->ioptions().merge_operator.get()),
storage_info_(
(cfd_ == nullptr) ? nullptr : &cfd_->internal_comparator(),
(cfd_ == nullptr) ? nullptr : cfd_->user_comparator(),
cfd_ == nullptr ? 0 : cfd_->NumberLevels(),
cfd_ == nullptr ? kCompactionStyleLevel
: cfd_->ioptions().compaction_style,
(cfd_ == nullptr || cfd_->current() == nullptr)
? nullptr
: cfd_->current()->storage_info(),
cfd_ == nullptr ? false : cfd_->ioptions().force_consistency_checks,
epoch_number_requirement,
cfd_ == nullptr ? nullptr : cfd_->ioptions().clock,
cfd_ == nullptr ? 0
: mutable_cf_options.bottommost_file_compaction_delay,
vset->offpeak_time_option()),
vset_(vset),
next_(this),
prev_(this),
refs_(0),
file_options_(file_opt),
mutable_cf_options_(mutable_cf_options),
max_file_size_for_l0_meta_pin_(
MaxFileSizeForL0MetaPin(mutable_cf_options_)),
version_number_(version_number),
io_tracer_(io_tracer),
use_async_io_(false) {
if (CheckFSFeatureSupport(env_->GetFileSystem().get(),
FSSupportedOps::kAsyncIO)) {
use_async_io_ = true;
}
}
Status Version::GetBlob(const ReadOptions& read_options, const Slice& user_key,
const Slice& blob_index_slice,
FilePrefetchBuffer* prefetch_buffer,
PinnableSlice* value, uint64_t* bytes_read) const {
BlobIndex blob_index;
{
Status s = blob_index.DecodeFrom(blob_index_slice);
if (!s.ok()) {
return s;
}
}
return GetBlob(read_options, user_key, blob_index, prefetch_buffer, value,
bytes_read);
}
Status Version::GetBlob(const ReadOptions& read_options, const Slice& user_key,
const BlobIndex& blob_index,
FilePrefetchBuffer* prefetch_buffer,
PinnableSlice* value, uint64_t* bytes_read) const {
assert(value);
if (blob_index.HasTTL() || blob_index.IsInlined()) {
return Status::Corruption("Unexpected TTL/inlined blob index");
}
const uint64_t blob_file_number = blob_index.file_number();
auto blob_file_meta = storage_info_.GetBlobFileMetaData(blob_file_number);
if (!blob_file_meta) {
return Status::Corruption("Invalid blob file number");
}
assert(blob_source_);
value->Reset();
const Status s = blob_source_->GetBlob(
read_options, user_key, blob_file_number, blob_index.offset(),
blob_file_meta->GetBlobFileSize(), blob_index.size(),
blob_index.compression(), prefetch_buffer, value, bytes_read);
return s;
}
void Version::MultiGetBlob(
const ReadOptions& read_options, MultiGetRange& range,
std::unordered_map<uint64_t, BlobReadContexts>& blob_ctxs) {
assert(!blob_ctxs.empty());
autovector<BlobFileReadRequests> blob_reqs;
for (auto& ctx : blob_ctxs) {
const auto file_number = ctx.first;
const auto blob_file_meta = storage_info_.GetBlobFileMetaData(file_number);
autovector<BlobReadRequest> blob_reqs_in_file;
BlobReadContexts& blobs_in_file = ctx.second;
for (auto& blob : blobs_in_file) {
const BlobIndex& blob_index = blob.blob_index;
const KeyContext* const key_context = blob.key_context;
assert(key_context);
assert(key_context->get_context);
assert(key_context->s);
if (key_context->value) {
key_context->value->Reset();
} else {
assert(key_context->columns);
key_context->columns->Reset();
}
if (!blob_file_meta) {
*key_context->s = Status::Corruption("Invalid blob file number");
continue;
}
if (blob_index.HasTTL() || blob_index.IsInlined()) {
*key_context->s =
Status::Corruption("Unexpected TTL/inlined blob index");
continue;
}
blob_reqs_in_file.emplace_back(
key_context->get_context->ukey_to_get_blob_value(),
blob_index.offset(), blob_index.size(), blob_index.compression(),
&blob.result, key_context->s);
}
if (blob_reqs_in_file.size() > 0) {
const auto file_size = blob_file_meta->GetBlobFileSize();
blob_reqs.emplace_back(file_number, file_size, blob_reqs_in_file);
}
}
if (blob_reqs.size() > 0) {
blob_source_->MultiGetBlob(read_options, blob_reqs,
nullptr);
}
for (auto& ctx : blob_ctxs) {
BlobReadContexts& blobs_in_file = ctx.second;
for (auto& blob : blobs_in_file) {
const KeyContext* const key_context = blob.key_context;
assert(key_context);
assert(key_context->get_context);
assert(key_context->s);
if (key_context->s->ok()) {
if (key_context->value) {
*key_context->value = std::move(blob.result);
range.AddValueSize(key_context->value->size());
} else {
assert(key_context->columns);
key_context->columns->SetPlainValue(std::move(blob.result));
range.AddValueSize(key_context->columns->serialized_size());
}
if (range.GetValueSize() > read_options.value_size_soft_limit) {
*key_context->s = Status::Aborted();
}
} else if (key_context->s->IsIncomplete()) {
auto& get_context = *(key_context->get_context);
get_context.MarkKeyMayExist();
}
}
}
}
void Version::Get(const ReadOptions& read_options, const LookupKey& k,
PinnableSlice* value, PinnableWideColumns* columns,
std::string* timestamp, Status* status,
MergeContext* merge_context,
SequenceNumber* max_covering_tombstone_seq,
PinnedIteratorsManager* pinned_iters_mgr, bool* value_found,
bool* key_exists, SequenceNumber* seq, ReadCallback* callback,
bool* is_blob, bool do_merge) {
Slice ikey = k.internal_key();
Slice user_key = k.user_key();
assert(status->ok() || status->IsMergeInProgress());
if (key_exists != nullptr) {
*key_exists = true;
}
uint64_t tracing_get_id = BlockCacheTraceHelper::kReservedGetId;
if (vset_ && vset_->block_cache_tracer_ &&
vset_->block_cache_tracer_->is_tracing_enabled()) {
tracing_get_id = vset_->block_cache_tracer_->NextGetId();
}
bool is_blob_index = false;
bool* const is_blob_to_use = is_blob ? is_blob : &is_blob_index;
BlobFetcher blob_fetcher(this, read_options);
assert(pinned_iters_mgr);
GetContext get_context(
user_comparator(), merge_operator_, info_log_, db_statistics_,
status->ok() ? GetContext::kNotFound : GetContext::kMerge, user_key,
do_merge ? value : nullptr, do_merge ? columns : nullptr,
do_merge ? timestamp : nullptr, value_found, merge_context, do_merge,
max_covering_tombstone_seq, clock_, seq,
merge_operator_ ? pinned_iters_mgr : nullptr, callback, is_blob_to_use,
tracing_get_id, &blob_fetcher);
if (merge_operator_) {
pinned_iters_mgr->StartPinning();
}
FilePicker fp(user_key, ikey, &storage_info_.level_files_brief_,
storage_info_.num_non_empty_levels_,
&storage_info_.file_indexer_, user_comparator(),
internal_comparator());
FdWithKeyRange* f = fp.GetNextFile();
while (f != nullptr) {
if (*max_covering_tombstone_seq > 0) {
break;
}
if (get_context.sample()) {
sample_file_read_inc(f->file_metadata);
}
bool timer_enabled =
GetPerfLevel() >= PerfLevel::kEnableTimeExceptForMutex &&
get_perf_context()->per_level_perf_context_enabled;
StopWatchNano timer(clock_, timer_enabled );
*status = table_cache_->Get(
read_options, *internal_comparator(), *f->file_metadata, ikey,
&get_context, mutable_cf_options_,
cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()),
IsFilterSkipped(static_cast<int>(fp.GetHitFileLevel()),
fp.IsHitFileLastInLevel()),
fp.GetHitFileLevel(), max_file_size_for_l0_meta_pin_);
if (timer_enabled) {
PERF_COUNTER_BY_LEVEL_ADD(get_from_table_nanos, timer.ElapsedNanos(),
fp.GetHitFileLevel());
}
if (!status->ok()) {
if (db_statistics_ != nullptr) {
get_context.ReportCounters();
}
return;
}
if (get_context.State() != GetContext::kNotFound &&
get_context.State() != GetContext::kMerge &&
db_statistics_ != nullptr) {
get_context.ReportCounters();
}
switch (get_context.State()) {
case GetContext::kNotFound:
break;
case GetContext::kMerge:
break;
case GetContext::kFound:
if (fp.GetHitFileLevel() == 0) {
RecordTick(db_statistics_, GET_HIT_L0);
} else if (fp.GetHitFileLevel() == 1) {
RecordTick(db_statistics_, GET_HIT_L1);
} else if (fp.GetHitFileLevel() >= 2) {
RecordTick(db_statistics_, GET_HIT_L2_AND_UP);
}
PERF_COUNTER_BY_LEVEL_ADD(user_key_return_count, 1,
fp.GetHitFileLevel());
if (is_blob_index && do_merge && (value || columns)) {
Slice blob_index =
value ? *value
: WideColumnsHelper::GetDefaultColumn(columns->columns());
TEST_SYNC_POINT_CALLBACK("Version::Get::TamperWithBlobIndex",
&blob_index);
constexpr FilePrefetchBuffer* prefetch_buffer = nullptr;
PinnableSlice result;
constexpr uint64_t* bytes_read = nullptr;
*status = GetBlob(read_options, get_context.ukey_to_get_blob_value(),
blob_index, prefetch_buffer, &result, bytes_read);
if (!status->ok()) {
if (status->IsIncomplete()) {
get_context.MarkKeyMayExist();
}
return;
}
if (value) {
*value = std::move(result);
} else {
assert(columns);
columns->SetPlainValue(std::move(result));
}
}
return;
case GetContext::kDeleted:
*status = Status::NotFound();
return;
case GetContext::kCorrupt:
*status = Status::Corruption("corrupted key for ", user_key);
return;
case GetContext::kUnexpectedBlobIndex:
ROCKS_LOG_ERROR(info_log_, "Encounter unexpected blob index.");
*status = Status::NotSupported(
"Encounter unexpected blob index. Please open DB with "
"ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
return;
case GetContext::kMergeOperatorFailed:
*status = Status::Corruption(Status::SubCode::kMergeOperatorFailed);
return;
}
f = fp.GetNextFile();
}
if (db_statistics_ != nullptr) {
get_context.ReportCounters();
}
if (GetContext::kMerge == get_context.State()) {
if (!do_merge) {
*status = Status::OK();
return;
}
if (!merge_operator_) {
*status = Status::InvalidArgument(
"merge_operator is not properly initialized.");
return;
}
if (value || columns) {
*status = MergeHelper::TimedFullMerge(
merge_operator_, user_key, MergeHelper::kNoBaseValue,
merge_context->GetOperands(), info_log_, db_statistics_, clock_,
true, nullptr,
value ? value->GetSelf() : nullptr, columns);
if (status->ok()) {
if (LIKELY(value != nullptr)) {
value->PinSelf();
}
}
}
} else {
if (key_exists != nullptr) {
*key_exists = false;
}
*status = Status::NotFound(); }
}
void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
ReadCallback* callback) {
PinnedIteratorsManager pinned_iters_mgr;
if (merge_operator_) {
pinned_iters_mgr.StartPinning();
}
uint64_t tracing_mget_id = BlockCacheTraceHelper::kReservedGetId;
if (vset_ && vset_->block_cache_tracer_ &&
vset_->block_cache_tracer_->is_tracing_enabled()) {
tracing_mget_id = vset_->block_cache_tracer_->NextGetId();
}
autovector<GetContext, 16> get_ctx;
BlobFetcher blob_fetcher(this, read_options);
for (auto iter = range->begin(); iter != range->end(); ++iter) {
assert(iter->s->ok() || iter->s->IsMergeInProgress());
get_ctx.emplace_back(
user_comparator(), merge_operator_, info_log_, db_statistics_,
iter->s->ok() ? GetContext::kNotFound : GetContext::kMerge,
iter->ukey_with_ts, iter->value, iter->columns, iter->timestamp,
nullptr, &(iter->merge_context), true,
&iter->max_covering_tombstone_seq, clock_, nullptr,
merge_operator_ ? &pinned_iters_mgr : nullptr, callback,
&iter->is_blob_index, tracing_mget_id, &blob_fetcher);
*(iter->s) = Status::OK();
}
int get_ctx_index = 0;
for (auto iter = range->begin(); iter != range->end();
++iter, get_ctx_index++) {
iter->get_context = &(get_ctx[get_ctx_index]);
}
Status s;
std::unordered_map<uint64_t, BlobReadContexts> blob_ctxs;
MultiGetRange keys_with_blobs_range(*range, range->begin(), range->end());
#if USE_COROUTINES
if (read_options.async_io && read_options.optimize_multiget_for_io &&
using_coroutines() && use_async_io_) {
s = MultiGetAsync(read_options, range, &blob_ctxs);
} else
#endif {
MultiGetRange file_picker_range(*range, range->begin(), range->end());
FilePickerMultiGet fp(&file_picker_range, &storage_info_.level_files_brief_,
storage_info_.num_non_empty_levels_,
&storage_info_.file_indexer_, user_comparator(),
internal_comparator());
FdWithKeyRange* f = fp.GetNextFileInLevel();
uint64_t num_index_read = 0;
uint64_t num_filter_read = 0;
uint64_t num_sst_read = 0;
uint64_t num_level_read = 0;
int prev_level = -1;
while (!fp.IsSearchEnded()) {
bool dump_stats_for_l0_file = false;
if (!read_options.async_io || !using_coroutines() || !use_async_io_ ||
fp.GetHitFileLevel() == 0 || !fp.RemainingOverlapInLevel()) {
if (f) {
bool skip_filters =
IsFilterSkipped(static_cast<int>(fp.GetHitFileLevel()),
fp.IsHitFileLastInLevel());
s = MultiGetFromSST(read_options, fp.CurrentFileRange(),
fp.GetHitFileLevel(), skip_filters,
false, f, blob_ctxs,
nullptr, num_filter_read,
num_index_read, num_sst_read);
if (fp.GetHitFileLevel() == 0) {
dump_stats_for_l0_file = true;
}
}
if (s.ok()) {
f = fp.GetNextFileInLevel();
}
#if USE_COROUTINES
} else {
std::vector<folly::coro::Task<Status>> mget_tasks;
while (f != nullptr) {
MultiGetRange file_range = fp.CurrentFileRange();
TableCache::TypedHandle* table_handle = nullptr;
bool skip_filters =
IsFilterSkipped(static_cast<int>(fp.GetHitFileLevel()),
fp.IsHitFileLastInLevel());
bool skip_range_deletions = false;
if (!skip_filters) {
Status status = table_cache_->MultiGetFilter(
read_options, *internal_comparator(), *f->file_metadata,
mutable_cf_options_,
cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()),
fp.GetHitFileLevel(), &file_range, &table_handle);
skip_range_deletions = true;
if (status.ok()) {
skip_filters = true;
} else if (!status.IsNotSupported()) {
s = status;
}
}
if (!s.ok()) {
break;
}
if (!file_range.empty()) {
mget_tasks.emplace_back(MultiGetFromSSTCoroutine(
read_options, file_range, fp.GetHitFileLevel(), skip_filters,
skip_range_deletions, f, blob_ctxs, table_handle,
num_filter_read, num_index_read, num_sst_read));
}
if (fp.KeyMaySpanNextFile()) {
break;
}
f = fp.GetNextFileInLevel();
}
if (mget_tasks.size() > 0) {
RecordTick(db_statistics_, MULTIGET_COROUTINE_COUNT,
mget_tasks.size());
std::vector<Status> statuses =
folly::coro::blockingWait(co_withExecutor(
&range->context()->executor(),
folly::coro::collectAllRange(std::move(mget_tasks))));
if (s.ok()) {
for (Status stat : statuses) {
if (!stat.ok()) {
s = std::move(stat);
break;
}
}
}
if (s.ok() && fp.KeyMaySpanNextFile()) {
f = fp.GetNextFileInLevel();
}
}
#endif }
if (!s.ok() || file_picker_range.empty()) {
break;
}
if (!f) {
fp.PrepareNextLevelForSearch();
if (!fp.IsSearchEnded()) {
f = fp.GetNextFileInLevel();
}
if (dump_stats_for_l0_file ||
(prev_level != 0 && prev_level != (int)fp.GetHitFileLevel())) {
if (num_filter_read + num_index_read) {
RecordInHistogram(db_statistics_,
NUM_INDEX_AND_FILTER_BLOCKS_READ_PER_LEVEL,
num_index_read + num_filter_read);
}
if (num_sst_read) {
RecordInHistogram(db_statistics_, NUM_SST_READ_PER_LEVEL,
num_sst_read);
num_level_read++;
}
num_filter_read = 0;
num_index_read = 0;
num_sst_read = 0;
}
prev_level = fp.GetHitFileLevel();
}
}
if (num_filter_read + num_index_read) {
RecordInHistogram(db_statistics_,
NUM_INDEX_AND_FILTER_BLOCKS_READ_PER_LEVEL,
num_index_read + num_filter_read);
}
if (num_sst_read) {
RecordInHistogram(db_statistics_, NUM_SST_READ_PER_LEVEL, num_sst_read);
num_level_read++;
}
if (num_level_read) {
RecordInHistogram(db_statistics_, NUM_LEVEL_READ_PER_MULTIGET,
num_level_read);
}
}
if (!blob_ctxs.empty()) {
MultiGetBlob(read_options, keys_with_blobs_range, blob_ctxs);
}
for (auto iter = range->begin(); s.ok() && iter != range->end(); ++iter) {
GetContext& get_context = *iter->get_context;
Status* status = iter->s;
Slice user_key = iter->lkey->user_key();
if (db_statistics_ != nullptr) {
get_context.ReportCounters();
}
if (GetContext::kMerge == get_context.State()) {
if (!merge_operator_) {
*status = Status::InvalidArgument(
"merge_operator is not properly initialized.");
range->MarkKeyDone(iter);
continue;
}
*status = MergeHelper::TimedFullMerge(
merge_operator_, user_key, MergeHelper::kNoBaseValue,
iter->merge_context.GetOperands(), info_log_, db_statistics_, clock_,
true, nullptr,
iter->value ? iter->value->GetSelf() : nullptr, iter->columns);
if (LIKELY(iter->value != nullptr)) {
iter->value->PinSelf();
range->AddValueSize(iter->value->size());
} else {
assert(iter->columns);
range->AddValueSize(iter->columns->serialized_size());
}
range->MarkKeyDone(iter);
if (range->GetValueSize() > read_options.value_size_soft_limit) {
s = Status::Aborted();
break;
}
} else {
range->MarkKeyDone(iter);
*status = Status::NotFound(); }
}
for (auto iter = range->begin(); iter != range->end(); ++iter) {
range->MarkKeyDone(iter);
*(iter->s) = s;
}
}
#ifdef USE_COROUTINES
Status Version::ProcessBatch(
const ReadOptions& read_options, FilePickerMultiGet* batch,
std::vector<folly::coro::Task<Status>>& mget_tasks,
std::unordered_map<uint64_t, BlobReadContexts>* blob_ctxs,
autovector<FilePickerMultiGet, 4>& batches, std::deque<size_t>& waiting,
std::deque<size_t>& to_process, unsigned int& num_tasks_queued,
std::unordered_map<int, std::tuple<uint64_t, uint64_t, uint64_t>>&
mget_stats) {
FilePickerMultiGet& fp = *batch;
MultiGetRange range = fp.GetRange();
MultiGetRange leftover(range, range.begin(), range.begin());
FdWithKeyRange* f = nullptr;
Status s;
f = fp.GetNextFileInLevel();
while (!f) {
fp.PrepareNextLevelForSearch();
if (!fp.IsSearchEnded()) {
f = fp.GetNextFileInLevel();
} else {
break;
}
}
while (f) {
MultiGetRange file_range = fp.CurrentFileRange();
TableCache::TypedHandle* table_handle = nullptr;
bool skip_filters = IsFilterSkipped(static_cast<int>(fp.GetHitFileLevel()),
fp.IsHitFileLastInLevel());
bool skip_range_deletions = false;
if (!skip_filters) {
Status status = table_cache_->MultiGetFilter(
read_options, *internal_comparator(), *f->file_metadata,
mutable_cf_options_,
cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()),
fp.GetHitFileLevel(), &file_range, &table_handle);
if (status.ok()) {
skip_filters = true;
skip_range_deletions = true;
} else if (!status.IsNotSupported()) {
s = status;
}
}
if (!s.ok()) {
break;
}
leftover += ~file_range;
range -= ~file_range;
if (!file_range.empty()) {
int level = fp.GetHitFileLevel();
auto stat = mget_stats.find(level);
if (stat == mget_stats.end()) {
auto entry = mget_stats.insert({level, {0, 0, 0}});
assert(entry.second);
stat = entry.first;
}
if (waiting.empty() && to_process.empty() &&
!fp.RemainingOverlapInLevel() && leftover.empty() &&
mget_tasks.empty()) {
s = MultiGetFromSST(read_options, file_range, fp.GetHitFileLevel(),
skip_filters, skip_range_deletions, f, *blob_ctxs,
table_handle, std::get<0>(stat->second),
std::get<1>(stat->second),
std::get<2>(stat->second));
} else {
mget_tasks.emplace_back(MultiGetFromSSTCoroutine(
read_options, file_range, fp.GetHitFileLevel(), skip_filters,
skip_range_deletions, f, *blob_ctxs, table_handle,
std::get<0>(stat->second), std::get<1>(stat->second),
std::get<2>(stat->second)));
++num_tasks_queued;
}
}
if (fp.KeyMaySpanNextFile() && !file_range.empty()) {
break;
}
f = fp.GetNextFileInLevel();
}
if (s.ok() && !f && !leftover.empty() && !range.empty()) {
fp.ReplaceRange(range);
batches.emplace_back(&leftover, fp);
to_process.emplace_back(batches.size() - 1);
}
if (!f && !range.empty() && !num_tasks_queued) {
fp.PrepareNextLevelForSearch();
}
return s;
}
Status Version::MultiGetAsync(
const ReadOptions& options, MultiGetRange* range,
std::unordered_map<uint64_t, BlobReadContexts>* blob_ctxs) {
autovector<FilePickerMultiGet, 4> batches;
std::deque<size_t> waiting;
std::deque<size_t> to_process;
Status s;
std::vector<folly::coro::Task<Status>> mget_tasks;
std::unordered_map<int, std::tuple<uint64_t, uint64_t, uint64_t>> mget_stats;
batches.emplace_back(range, &storage_info_.level_files_brief_,
storage_info_.num_non_empty_levels_,
&storage_info_.file_indexer_, user_comparator(),
internal_comparator());
to_process.emplace_back(0);
while (!to_process.empty()) {
batches.reserve(batches.size() + 1);
size_t idx = to_process.front();
FilePickerMultiGet* batch = &batches.at(idx);
unsigned int num_tasks_queued = 0;
to_process.pop_front();
if (batch->IsSearchEnded() || batch->GetRange().empty()) {
if (!to_process.empty()) {
continue;
}
} else {
s = ProcessBatch(options, batch, mget_tasks, blob_ctxs, batches, waiting,
to_process, num_tasks_queued, mget_stats);
if (!num_tasks_queued && !batch->IsSearchEnded()) {
to_process.emplace_back(idx);
} else if (num_tasks_queued) {
waiting.emplace_back(idx);
}
}
if (to_process.empty() || !s.ok()) {
if (mget_tasks.size() > 0) {
assert(waiting.size());
RecordTick(db_statistics_, MULTIGET_COROUTINE_COUNT, mget_tasks.size());
std::vector<Status> statuses =
folly::coro::blockingWait(co_withExecutor(
&range->context()->executor(),
folly::coro::collectAllRange(std::move(mget_tasks))));
mget_tasks.clear();
if (s.ok()) {
for (Status stat : statuses) {
if (!stat.ok()) {
s = std::move(stat);
break;
}
}
}
if (!s.ok()) {
break;
}
for (size_t wait_idx : waiting) {
FilePickerMultiGet& fp = batches.at(wait_idx);
if (!fp.GetHitFile() && !fp.GetRange().empty()) {
fp.PrepareNextLevelForSearch();
}
}
to_process.swap(waiting);
} else {
assert(!s.ok() || waiting.size() == 0);
}
}
if (!s.ok()) {
break;
}
}
uint64_t num_levels = 0;
for (auto& stat : mget_stats) {
if (stat.first == 0) {
num_levels += std::get<2>(stat.second);
} else {
num_levels++;
}
uint64_t num_meta_reads =
std::get<0>(stat.second) + std::get<1>(stat.second);
uint64_t num_sst_reads = std::get<2>(stat.second);
if (num_meta_reads > 0) {
RecordInHistogram(db_statistics_,
NUM_INDEX_AND_FILTER_BLOCKS_READ_PER_LEVEL,
num_meta_reads);
}
if (num_sst_reads > 0) {
RecordInHistogram(db_statistics_, NUM_SST_READ_PER_LEVEL, num_sst_reads);
}
}
if (num_levels > 0) {
RecordInHistogram(db_statistics_, NUM_LEVEL_READ_PER_MULTIGET, num_levels);
}
return s;
}
#endif
bool Version::IsFilterSkipped(int level, bool is_file_last_in_level) {
return cfd_->ioptions().optimize_filters_for_hits &&
(level > 0 || is_file_last_in_level) &&
level == storage_info_.num_non_empty_levels() - 1;
}
void VersionStorageInfo::GenerateLevelFilesBrief() {
level_files_brief_.resize(num_non_empty_levels_);
for (int level = 0; level < num_non_empty_levels_; level++) {
DoGenerateLevelFilesBrief(&level_files_brief_[level], files_[level],
&arena_);
}
}
void VersionStorageInfo::PrepareForVersionAppend(
const ImmutableOptions& immutable_options,
const MutableCFOptions& mutable_cf_options) {
ComputeCompensatedSizes();
UpdateNumNonEmptyLevels();
CalculateBaseBytes(immutable_options, mutable_cf_options);
UpdateFilesByCompactionPri(immutable_options, mutable_cf_options);
GenerateFileIndexer();
GenerateLevelFilesBrief();
GenerateLevel0NonOverlapping();
GenerateBottommostFiles();
GenerateFileLocationIndex();
}
void Version::PrepareAppend(const ReadOptions& read_options,
bool update_stats) {
TEST_SYNC_POINT_CALLBACK(
"Version::PrepareAppend:forced_check",
static_cast<void*>(&storage_info_.force_consistency_checks_));
if (update_stats) {
UpdateAccumulatedStats(read_options);
}
storage_info_.PrepareForVersionAppend(cfd_->ioptions(), mutable_cf_options_);
}
bool Version::MaybeInitializeFileMetaData(const ReadOptions& read_options,
FileMetaData* file_meta) {
if (file_meta->init_stats_from_file || file_meta->compensated_file_size > 0) {
return false;
}
std::shared_ptr<const TableProperties> tp;
Status s = GetTableProperties(read_options, &tp, file_meta);
file_meta->init_stats_from_file = true;
if (!s.ok()) {
ROCKS_LOG_ERROR(vset_->db_options_->info_log,
"Unable to load table properties for file %" PRIu64
" --- %s\n",
file_meta->fd.GetNumber(), s.ToString().c_str());
return false;
}
if (tp.get() == nullptr) {
return false;
}
file_meta->num_entries = tp->num_entries;
file_meta->num_deletions = tp->num_deletions;
file_meta->raw_value_size = tp->raw_value_size;
file_meta->raw_key_size = tp->raw_key_size;
file_meta->num_range_deletions = tp->num_range_deletions;
file_meta->num_deletions =
std::max(tp->num_deletions, tp->num_range_deletions);
file_meta->num_entries = std::max(tp->num_entries, file_meta->num_deletions);
return true;
}
void VersionStorageInfo::UpdateAccumulatedStats(FileMetaData* file_meta) {
TEST_SYNC_POINT_CALLBACK("VersionStorageInfo::UpdateAccumulatedStats",
nullptr);
assert(file_meta->init_stats_from_file);
accumulated_file_size_ += file_meta->fd.GetFileSize();
accumulated_raw_key_size_ += file_meta->raw_key_size;
accumulated_raw_value_size_ += file_meta->raw_value_size;
assert(file_meta->num_entries >= file_meta->num_deletions);
accumulated_num_non_deletions_ +=
file_meta->num_entries - file_meta->num_deletions;
accumulated_num_deletions_ += file_meta->num_deletions;
current_num_non_deletions_ +=
file_meta->num_entries - file_meta->num_deletions;
current_num_deletions_ += file_meta->num_deletions;
current_num_samples_++;
}
void VersionStorageInfo::RemoveCurrentStats(FileMetaData* file_meta) {
if (file_meta->init_stats_from_file) {
current_num_non_deletions_ -=
file_meta->num_entries - file_meta->num_deletions;
current_num_deletions_ -= file_meta->num_deletions;
current_num_samples_--;
}
}
void Version::UpdateAccumulatedStats(const ReadOptions& read_options) {
const int kMaxInitCount = 20;
int init_count = 0;
for (int level = 0;
level < storage_info_.num_levels_ && init_count < kMaxInitCount;
++level) {
for (auto* file_meta : storage_info_.files_[level]) {
if (MaybeInitializeFileMetaData(read_options, file_meta)) {
storage_info_.UpdateAccumulatedStats(file_meta);
if (vset_->GetColumnFamilySet()->get_table_cache()->GetCapacity() ==
TableCache::kInfiniteCapacity) {
continue;
}
if (++init_count >= kMaxInitCount) {
break;
}
}
}
}
for (int level = storage_info_.num_levels_ - 1;
storage_info_.accumulated_raw_value_size_ == 0 && level >= 0; --level) {
for (int i = static_cast<int>(storage_info_.files_[level].size()) - 1;
storage_info_.accumulated_raw_value_size_ == 0 && i >= 0; --i) {
if (MaybeInitializeFileMetaData(read_options,
storage_info_.files_[level][i])) {
storage_info_.UpdateAccumulatedStats(storage_info_.files_[level][i]);
}
}
}
}
void VersionStorageInfo::ComputeCompensatedSizes() {
static const int kDeletionWeightOnCompaction = 2;
uint64_t average_value_size = GetAverageValueSize();
for (int level = 0; level < num_levels_; level++) {
for (auto* file_meta : files_[level]) {
if (file_meta->compensated_file_size == 0) {
file_meta->compensated_file_size = file_meta->fd.GetFileSize();
if ((file_meta->num_deletions - file_meta->num_range_deletions) * 2 >=
file_meta->num_entries) {
file_meta->compensated_file_size +=
((file_meta->num_deletions - file_meta->num_range_deletions) * 2 -
file_meta->num_entries) *
average_value_size * kDeletionWeightOnCompaction;
}
file_meta->compensated_file_size +=
file_meta->compensated_range_deletion_size;
}
}
}
}
int VersionStorageInfo::MaxInputLevel() const {
if (compaction_style_ == kCompactionStyleLevel) {
return num_levels() - 2;
}
return 0;
}
int VersionStorageInfo::MaxOutputLevel(bool allow_ingest_behind) const {
if (allow_ingest_behind) {
assert(num_levels() > 1);
return num_levels() - 2;
}
return num_levels() - 1;
}
void VersionStorageInfo::EstimateCompactionBytesNeeded(
const MutableCFOptions& mutable_cf_options) {
if (compaction_style_ != kCompactionStyleLevel) {
estimated_compaction_needed_bytes_ = 0;
return;
}
uint64_t bytes_compact_to_next_level = 0;
uint64_t level_size = 0;
for (auto* f : files_[0]) {
level_size += f->fd.GetFileSize();
}
bool level0_compact_triggered = false;
if (static_cast<int>(files_[0].size()) >=
mutable_cf_options.level0_file_num_compaction_trigger ||
level_size >= mutable_cf_options.max_bytes_for_level_base) {
level0_compact_triggered = true;
estimated_compaction_needed_bytes_ = level_size;
bytes_compact_to_next_level = level_size;
} else {
estimated_compaction_needed_bytes_ = 0;
}
uint64_t bytes_next_level = 0;
for (int level = base_level(); level <= MaxInputLevel(); level++) {
level_size = 0;
if (bytes_next_level > 0) {
#ifndef NDEBUG
uint64_t level_size2 = 0;
for (auto* f : files_[level]) {
level_size2 += f->fd.GetFileSize();
}
assert(level_size2 == bytes_next_level);
#endif
level_size = bytes_next_level;
bytes_next_level = 0;
} else {
for (auto* f : files_[level]) {
level_size += f->fd.GetFileSize();
}
}
if (level == base_level() && level0_compact_triggered) {
estimated_compaction_needed_bytes_ += level_size;
}
level_size += bytes_compact_to_next_level;
bytes_compact_to_next_level = 0;
uint64_t level_target = MaxBytesForLevel(level);
if (level_size > level_target) {
bytes_compact_to_next_level = level_size - level_target;
assert(bytes_next_level == 0);
if (level + 1 < num_levels_) {
for (auto* f : files_[level + 1]) {
bytes_next_level += f->fd.GetFileSize();
}
}
if (bytes_next_level > 0) {
assert(level_size > 0);
estimated_compaction_needed_bytes_ += static_cast<uint64_t>(
static_cast<double>(bytes_compact_to_next_level) *
(static_cast<double>(bytes_next_level) /
static_cast<double>(level_size) +
1));
}
}
}
}
namespace {
uint32_t GetExpiredTtlFilesCount(const ImmutableOptions& ioptions,
const MutableCFOptions& mutable_cf_options,
const std::vector<FileMetaData*>& files) {
uint32_t ttl_expired_files_count = 0;
int64_t _current_time;
auto status = ioptions.clock->GetCurrentTime(&_current_time);
if (status.ok()) {
const uint64_t current_time = static_cast<uint64_t>(_current_time);
for (FileMetaData* f : files) {
if (!f->being_compacted) {
uint64_t oldest_ancester_time = f->TryGetOldestAncesterTime();
if (oldest_ancester_time != 0 &&
oldest_ancester_time < (current_time - mutable_cf_options.ttl)) {
ttl_expired_files_count++;
}
}
}
}
return ttl_expired_files_count;
}
bool ShouldChangeFileTemperature(const ImmutableOptions& ioptions,
const MutableCFOptions& mutable_cf_options,
const std::vector<FileMetaData*>& files) {
const std::vector<FileTemperatureAge>& ages =
mutable_cf_options.compaction_options_fifo
.file_temperature_age_thresholds;
if (ages.empty()) {
return false;
}
if (files.empty()) {
return false;
}
int64_t _current_time;
auto status = ioptions.clock->GetCurrentTime(&_current_time);
const uint64_t current_time = static_cast<uint64_t>(_current_time);
if (status.ok() && current_time >= ages[0].age) {
uint64_t create_time_threshold = current_time - ages[0].age;
Temperature target_temp;
assert(files.size() >= 1);
for (size_t index = files.size(); index >= 1; --index) {
FileMetaData* cur_file = files[index - 1];
FileMetaData* prev_file = index < 2 ? nullptr : files[index - 2];
if (!cur_file->being_compacted) {
uint64_t est_newest_key_time = cur_file->TryGetNewestKeyTime(prev_file);
if (est_newest_key_time == kUnknownNewestKeyTime) {
continue;
}
if (est_newest_key_time > create_time_threshold) {
return false;
}
target_temp = ages[0].temperature;
for (size_t i = 1; i < ages.size(); ++i) {
if (current_time >= ages[i].age &&
est_newest_key_time <= current_time - ages[i].age) {
target_temp = ages[i].temperature;
}
}
if (cur_file->temperature != target_temp) {
return true;
}
}
}
}
return false;
}
}
void VersionStorageInfo::ComputeCompactionScore(
const ImmutableOptions& immutable_options,
const MutableCFOptions& mutable_cf_options,
const std::string& full_history_ts_low) {
double total_downcompact_bytes = 0.0;
const double kScoreScale = 10.0;
int max_output_level =
MaxOutputLevel(immutable_options.cf_allow_ingest_behind ||
immutable_options.allow_ingest_behind);
for (int level = 0; level <= MaxInputLevel(); level++) {
double score;
if (level == 0) {
int num_sorted_runs = 0;
uint64_t total_size = 0;
for (auto* f : files_[level]) {
total_downcompact_bytes += static_cast<double>(f->fd.GetFileSize());
if (!f->being_compacted) {
total_size += f->compensated_file_size;
num_sorted_runs++;
}
}
if (compaction_style_ == kCompactionStyleUniversal) {
for (int i = 1; i <= max_output_level; i++) {
if (!files_[i].empty() && !files_[i][0]->being_compacted) {
num_sorted_runs++;
}
}
}
if (compaction_style_ == kCompactionStyleFIFO) {
const auto& fifo_opts = mutable_cf_options.compaction_options_fifo;
uint64_t effective_size = total_size;
uint64_t effective_max = fifo_opts.max_table_files_size;
if (fifo_opts.max_data_files_size > 0) {
effective_size += GetBlobStats().total_file_size;
effective_max = fifo_opts.max_data_files_size;
}
if (effective_max == 0) {
effective_max = 1;
}
score = static_cast<double>(effective_size) / effective_max;
if (score < 1 && fifo_opts.allow_compaction) {
score = std::max(
static_cast<double>(num_sorted_runs) /
mutable_cf_options.level0_file_num_compaction_trigger,
score);
}
if (score < 1 && mutable_cf_options.ttl > 0) {
score =
std::max(static_cast<double>(GetExpiredTtlFilesCount(
immutable_options, mutable_cf_options, files_[0])),
score);
}
if (score < 1 &&
ShouldChangeFileTemperature(immutable_options, mutable_cf_options,
files_[0])) {
const double kScoreForNeedCompaction = 1.1;
score = kScoreForNeedCompaction;
}
} else {
score = static_cast<double>(num_sorted_runs) /
mutable_cf_options.level0_file_num_compaction_trigger;
if (compaction_style_ == kCompactionStyleLevel && num_levels() > 1) {
if (immutable_options.level_compaction_dynamic_level_bytes) {
if (total_size >= mutable_cf_options.max_bytes_for_level_base) {
score = std::max(score, 1.01);
}
if (total_size > level_max_bytes_[base_level_]) {
uint64_t base_level_size = 0;
for (auto f : files_[base_level_]) {
base_level_size += f->compensated_file_size;
}
score = std::max(score, static_cast<double>(total_size) /
static_cast<double>(std::max(
base_level_size,
level_max_bytes_[base_level_])));
}
if (score > 1.0) {
score *= kScoreScale;
}
} else {
score = std::max(score,
static_cast<double>(total_size) /
mutable_cf_options.max_bytes_for_level_base);
}
}
}
} else { uint64_t level_bytes_no_compacting = 0;
uint64_t level_total_bytes = 0;
for (auto f : files_[level]) {
level_total_bytes += f->fd.GetFileSize();
if (!f->being_compacted) {
level_bytes_no_compacting += f->compensated_file_size;
}
}
if (!immutable_options.level_compaction_dynamic_level_bytes) {
score = static_cast<double>(level_bytes_no_compacting) /
MaxBytesForLevel(level);
} else {
if (level_bytes_no_compacting < MaxBytesForLevel(level)) {
score = static_cast<double>(level_bytes_no_compacting) /
MaxBytesForLevel(level);
} else {
score = static_cast<double>(level_bytes_no_compacting) /
(MaxBytesForLevel(level) + total_downcompact_bytes) *
kScoreScale;
}
if (level_bytes_no_compacting > 0 &&
level <= lowest_unnecessary_level_) {
score = std::max(
score, kScoreScale *
(1.001 + 0.001 * (lowest_unnecessary_level_ - level)));
}
}
if (level <= lowest_unnecessary_level_) {
total_downcompact_bytes += level_total_bytes;
} else if (level_total_bytes > MaxBytesForLevel(level)) {
total_downcompact_bytes +=
static_cast<double>(level_total_bytes - MaxBytesForLevel(level));
}
}
compaction_level_[level] = level;
compaction_score_[level] = score;
}
for (int i = 0; i < num_levels() - 2; i++) {
for (int j = i + 1; j < num_levels() - 1; j++) {
if (compaction_score_[i] < compaction_score_[j]) {
double score = compaction_score_[i];
int level = compaction_level_[i];
compaction_score_[i] = compaction_score_[j];
compaction_level_[i] = compaction_level_[j];
compaction_score_[j] = score;
compaction_level_[j] = level;
}
}
}
ComputeFilesMarkedForCompaction(max_output_level);
ComputeBottommostFilesMarkedForCompaction(
immutable_options.cf_allow_ingest_behind ||
immutable_options.allow_ingest_behind,
immutable_options.user_comparator, full_history_ts_low);
ComputeExpiredTtlFiles(immutable_options, mutable_cf_options.ttl);
ComputeFilesMarkedForPeriodicCompaction(
immutable_options, mutable_cf_options.periodic_compaction_seconds,
max_output_level);
ComputeFilesMarkedForForcedBlobGC(
mutable_cf_options.blob_garbage_collection_age_cutoff,
mutable_cf_options.blob_garbage_collection_force_threshold,
mutable_cf_options.enable_blob_garbage_collection);
EstimateCompactionBytesNeeded(mutable_cf_options);
}
void VersionStorageInfo::ComputeFilesMarkedForCompaction(int last_level) {
files_marked_for_compaction_.clear();
int last_qualify_level = 0;
standalone_range_tombstone_files_mark_threshold_ = kMaxSequenceNumber;
for (int level = last_level; level >= 1; level--) {
if (!files_[level].empty()) {
last_qualify_level = level - 1;
break;
}
}
for (int level = 0; level <= last_qualify_level; level++) {
for (auto* f : files_[level]) {
if (!f->being_compacted && f->marked_for_compaction) {
files_marked_for_compaction_.emplace_back(level, f);
if (f->FileIsStandAloneRangeTombstone()) {
standalone_range_tombstone_files_mark_threshold_ =
std::min(standalone_range_tombstone_files_mark_threshold_,
f->fd.smallest_seqno);
}
}
}
}
}
void VersionStorageInfo::ComputeExpiredTtlFiles(
const ImmutableOptions& ioptions, const uint64_t ttl) {
expired_ttl_files_.clear();
if (ttl == 0 || compaction_style_ != CompactionStyle::kCompactionStyleLevel) {
return;
}
int64_t _current_time;
auto status = ioptions.clock->GetCurrentTime(&_current_time);
if (!status.ok()) {
return;
}
const uint64_t current_time = static_cast<uint64_t>(_current_time);
for (int level = 0; level < num_levels() - 1; level++) {
for (FileMetaData* f : files_[level]) {
if (!f->being_compacted) {
uint64_t oldest_ancester_time = f->TryGetOldestAncesterTime();
if (oldest_ancester_time > 0 &&
oldest_ancester_time < (current_time - ttl)) {
expired_ttl_files_.emplace_back(level, f);
}
}
}
}
}
void VersionStorageInfo::ComputeFilesMarkedForPeriodicCompaction(
const ImmutableOptions& ioptions,
const uint64_t periodic_compaction_seconds, int last_level) {
files_marked_for_periodic_compaction_.clear();
if (periodic_compaction_seconds == 0) {
return;
}
int64_t temp_current_time;
auto status = ioptions.clock->GetCurrentTime(&temp_current_time);
if (!status.ok()) {
return;
}
const uint64_t current_time = static_cast<uint64_t>(temp_current_time);
if (periodic_compaction_seconds > current_time) {
return;
}
const uint64_t allowed_time_limit =
current_time - periodic_compaction_seconds;
const OffpeakTimeInfo offpeak_time_info =
offpeak_time_option_.GetOffpeakTimeInfo(current_time);
const uint64_t adjusted_allowed_time_limit =
allowed_time_limit +
(offpeak_time_info.is_now_offpeak
? offpeak_time_info.seconds_till_next_offpeak_start
: 0);
for (int level = 0; level <= last_level; level++) {
for (auto f : files_[level]) {
if (!f->being_compacted) {
uint64_t file_modification_time = f->TryGetFileCreationTime();
if (file_modification_time == kUnknownFileCreationTime) {
file_modification_time = f->TryGetOldestAncesterTime();
}
if (file_modification_time == kUnknownOldestAncesterTime) {
auto file_path = TableFileName(ioptions.cf_paths, f->fd.GetNumber(),
f->fd.GetPathId());
status = ioptions.env->GetFileModificationTime(
file_path, &file_modification_time);
if (!status.ok()) {
ROCKS_LOG_WARN(ioptions.logger,
"Can't get file modification time: %s: %s",
file_path.c_str(), status.ToString().c_str());
continue;
}
}
if (file_modification_time > 0 &&
file_modification_time < adjusted_allowed_time_limit) {
files_marked_for_periodic_compaction_.emplace_back(level, f);
}
}
}
}
}
void VersionStorageInfo::ComputeFilesMarkedForForcedBlobGC(
double blob_garbage_collection_age_cutoff,
double blob_garbage_collection_force_threshold,
bool enable_blob_garbage_collection) {
files_marked_for_forced_blob_gc_.clear();
if (!(enable_blob_garbage_collection &&
blob_garbage_collection_age_cutoff > 0.0 &&
blob_garbage_collection_force_threshold < 1.0)) {
return;
}
if (blob_files_.empty()) {
return;
}
const size_t cutoff_count = static_cast<size_t>(
blob_garbage_collection_age_cutoff * blob_files_.size());
if (!cutoff_count) {
return;
}
const auto& oldest_meta = blob_files_.front();
assert(oldest_meta);
const auto& linked_ssts = oldest_meta->GetLinkedSsts();
assert(!linked_ssts.empty());
size_t count = 1;
uint64_t sum_total_blob_bytes = oldest_meta->GetTotalBlobBytes();
uint64_t sum_garbage_blob_bytes = oldest_meta->GetGarbageBlobBytes();
assert(cutoff_count <= blob_files_.size());
for (; count < cutoff_count; ++count) {
const auto& meta = blob_files_[count];
assert(meta);
sum_total_blob_bytes += meta->GetTotalBlobBytes();
sum_garbage_blob_bytes += meta->GetGarbageBlobBytes();
}
if (sum_garbage_blob_bytes <
blob_garbage_collection_force_threshold * sum_total_blob_bytes) {
return;
}
for (uint64_t sst_file_number : linked_ssts) {
const FileLocation location = GetFileLocation(sst_file_number);
assert(location.IsValid());
const int level = location.GetLevel();
assert(level >= 0);
const size_t pos = location.GetPosition();
FileMetaData* const sst_meta = files_[level][pos];
assert(sst_meta);
if (sst_meta->being_compacted) {
continue;
}
files_marked_for_forced_blob_gc_.emplace_back(level, sst_meta);
}
}
namespace {
struct Fsize {
size_t index;
FileMetaData* file;
};
bool CompareCompensatedSizeDescending(const Fsize& first, const Fsize& second) {
return (first.file->compensated_file_size >
second.file->compensated_file_size);
}
}
void VersionStorageInfo::AddFile(int level, FileMetaData* f) {
auto& level_files = files_[level];
level_files.push_back(f);
f->refs++;
}
void VersionStorageInfo::AddBlobFile(
std::shared_ptr<BlobFileMetaData> blob_file_meta) {
assert(blob_file_meta);
assert(blob_files_.empty() ||
(blob_files_.back() && blob_files_.back()->GetBlobFileNumber() <
blob_file_meta->GetBlobFileNumber()));
blob_files_.emplace_back(std::move(blob_file_meta));
}
VersionStorageInfo::BlobFiles::const_iterator
VersionStorageInfo::GetBlobFileMetaDataLB(uint64_t blob_file_number) const {
return std::lower_bound(
blob_files_.begin(), blob_files_.end(), blob_file_number,
[](const std::shared_ptr<BlobFileMetaData>& lhs, uint64_t rhs) {
assert(lhs);
return lhs->GetBlobFileNumber() < rhs;
});
}
void VersionStorageInfo::SetFinalized() {
finalized_ = true;
#ifndef NDEBUG
if (compaction_style_ != kCompactionStyleLevel) {
return;
}
assert(base_level_ < 0 || num_levels() == 1 ||
(base_level_ >= 1 && base_level_ < num_levels()));
for (int level = 1; level < base_level(); level++) {
assert(NumLevelBytes(level) == 0);
}
uint64_t max_bytes_prev_level = 0;
for (int level = base_level(); level < num_levels() - 1; level++) {
if (LevelFiles(level).size() == 0) {
continue;
}
assert(MaxBytesForLevel(level) >= max_bytes_prev_level);
max_bytes_prev_level = MaxBytesForLevel(level);
}
for (int level = 0; level < num_levels(); level++) {
assert(LevelFiles(level).size() == 0 ||
LevelFiles(level).size() == LevelFilesBrief(level).num_files);
if (LevelFiles(level).size() > 0) {
assert(level < num_non_empty_levels());
}
}
assert(compaction_level_.size() > 0);
assert(compaction_level_.size() == compaction_score_.size());
#endif
}
void VersionStorageInfo::UpdateNumNonEmptyLevels() {
num_non_empty_levels_ = num_levels_;
for (int i = num_levels_ - 1; i >= 0; i--) {
if (files_[i].size() != 0) {
return;
} else {
num_non_empty_levels_ = i;
}
}
}
namespace {
void SortFileByOverlappingRatio(
const InternalKeyComparator& icmp, const std::vector<FileMetaData*>& files,
const std::vector<FileMetaData*>& next_level_files, SystemClock* clock,
int level, int num_non_empty_levels, uint64_t ttl,
std::vector<Fsize>* temp) {
std::unordered_map<uint64_t, uint64_t> file_to_order;
auto next_level_it = next_level_files.begin();
int64_t curr_time;
Status status = clock->GetCurrentTime(&curr_time);
if (!status.ok()) {
ttl = 0;
}
FileTtlBooster ttl_booster(static_cast<uint64_t>(curr_time), ttl,
num_non_empty_levels, level);
for (auto& file : files) {
uint64_t overlapping_bytes = 0;
while (next_level_it != next_level_files.end() &&
icmp.Compare((*next_level_it)->largest, file->smallest) < 0) {
next_level_it++;
}
while (next_level_it != next_level_files.end() &&
icmp.Compare((*next_level_it)->smallest, file->largest) < 0) {
overlapping_bytes += (*next_level_it)->fd.file_size;
if (icmp.Compare((*next_level_it)->largest, file->largest) > 0) {
break;
}
next_level_it++;
}
uint64_t ttl_boost_score = (ttl > 0) ? ttl_booster.GetBoostScore(file) : 1;
assert(ttl_boost_score > 0);
assert(file->compensated_file_size != 0);
file_to_order[file->fd.GetNumber()] = overlapping_bytes * 1024U /
file->compensated_file_size /
ttl_boost_score;
}
size_t num_to_sort = temp->size() > VersionStorageInfo::kNumberFilesToSort
? VersionStorageInfo::kNumberFilesToSort
: temp->size();
std::partial_sort(
temp->begin(), temp->begin() + num_to_sort, temp->end(),
[&](const Fsize& f1, const Fsize& f2) -> bool {
if (f1.file->marked_for_compaction == f2.file->marked_for_compaction) {
if (file_to_order[f1.file->fd.GetNumber()] ==
file_to_order[f2.file->fd.GetNumber()]) {
return icmp.Compare(f1.file->smallest, f2.file->smallest) < 0;
}
return file_to_order[f1.file->fd.GetNumber()] <
file_to_order[f2.file->fd.GetNumber()];
} else {
return f1.file->marked_for_compaction >
f2.file->marked_for_compaction;
}
});
}
void SortFileByRoundRobin(const InternalKeyComparator& icmp,
std::vector<InternalKey>* compact_cursor,
bool level0_non_overlapping, int level,
std::vector<Fsize>* temp) {
if (level == 0 && !level0_non_overlapping) {
std::sort(temp->begin(), temp->end(),
[](const Fsize& f1, const Fsize& f2) -> bool {
return f1.file->fd.smallest_seqno < f2.file->fd.smallest_seqno;
});
return;
}
bool should_move_files =
compact_cursor->at(level).size() > 0 && temp->size() > 1;
std::vector<Fsize>::iterator current_file_iter;
if (should_move_files) {
current_file_iter = std::lower_bound(
temp->begin(), temp->end(), compact_cursor->at(level),
[&](const Fsize& f, const InternalKey& cursor) -> bool {
return icmp.Compare(cursor, f.file->smallest) > 0;
});
should_move_files =
current_file_iter != temp->end() && current_file_iter != temp->begin();
}
if (should_move_files) {
std::vector<Fsize> local_temp;
local_temp.reserve(temp->size());
for (auto iter = current_file_iter; iter != temp->end(); iter++) {
local_temp.push_back(*iter);
}
for (auto iter = temp->begin(); iter != current_file_iter; iter++) {
local_temp.push_back(*iter);
}
for (size_t i = 0; i < local_temp.size(); i++) {
temp->at(i) = local_temp[i];
}
}
}
}
void VersionStorageInfo::UpdateFilesByCompactionPri(
const ImmutableOptions& ioptions, const MutableCFOptions& options) {
if (compaction_style_ == kCompactionStyleNone ||
compaction_style_ == kCompactionStyleFIFO ||
compaction_style_ == kCompactionStyleUniversal) {
return;
}
for (int level = 0; level < num_levels() - 1; level++) {
const std::vector<FileMetaData*>& files = files_[level];
auto& files_by_compaction_pri = files_by_compaction_pri_[level];
assert(files_by_compaction_pri.size() == 0);
std::vector<Fsize> temp(files.size());
for (size_t i = 0; i < files.size(); i++) {
temp[i].index = i;
temp[i].file = files[i];
}
size_t num = VersionStorageInfo::kNumberFilesToSort;
if (num > temp.size()) {
num = temp.size();
}
switch (ioptions.compaction_pri) {
case kByCompensatedSize:
std::partial_sort(temp.begin(), temp.begin() + num, temp.end(),
CompareCompensatedSizeDescending);
break;
case kOldestLargestSeqFirst:
std::sort(temp.begin(), temp.end(),
[](const Fsize& f1, const Fsize& f2) -> bool {
return f1.file->fd.largest_seqno <
f2.file->fd.largest_seqno;
});
break;
case kOldestSmallestSeqFirst:
std::sort(temp.begin(), temp.end(),
[](const Fsize& f1, const Fsize& f2) -> bool {
return f1.file->fd.smallest_seqno <
f2.file->fd.smallest_seqno;
});
break;
case kMinOverlappingRatio:
SortFileByOverlappingRatio(*internal_comparator_, files_[level],
files_[level + 1], ioptions.clock, level,
num_non_empty_levels_, options.ttl, &temp);
break;
case kRoundRobin:
SortFileByRoundRobin(*internal_comparator_, &compact_cursor_,
level0_non_overlapping_, level, &temp);
break;
default:
assert(false);
}
assert(temp.size() == files.size());
for (size_t i = 0; i < temp.size(); i++) {
files_by_compaction_pri.push_back(static_cast<int>(temp[i].index));
}
next_file_to_compact_by_size_[level] = 0;
assert(files_[level].size() == files_by_compaction_pri_[level].size());
}
}
void VersionStorageInfo::GenerateLevel0NonOverlapping() {
assert(!finalized_);
level0_non_overlapping_ = true;
if (level_files_brief_.size() == 0) {
return;
}
std::vector<FdWithKeyRange> level0_sorted_file(
level_files_brief_[0].files,
level_files_brief_[0].files + level_files_brief_[0].num_files);
std::sort(level0_sorted_file.begin(), level0_sorted_file.end(),
[this](const FdWithKeyRange& f1, const FdWithKeyRange& f2) -> bool {
return (internal_comparator_->Compare(f1.smallest_key,
f2.smallest_key) < 0);
});
for (size_t i = 1; i < level0_sorted_file.size(); ++i) {
FdWithKeyRange& f = level0_sorted_file[i];
FdWithKeyRange& prev = level0_sorted_file[i - 1];
if (internal_comparator_->Compare(prev.largest_key, f.smallest_key) >= 0) {
level0_non_overlapping_ = false;
break;
}
}
}
void VersionStorageInfo::GenerateBottommostFiles() {
assert(!finalized_);
assert(bottommost_files_.empty());
for (size_t level = 0; level < level_files_brief_.size(); ++level) {
for (size_t file_idx = 0; file_idx < level_files_brief_[level].num_files;
++file_idx) {
const FdWithKeyRange& f = level_files_brief_[level].files[file_idx];
int l0_file_idx;
if (level == 0) {
l0_file_idx = static_cast<int>(file_idx);
} else {
l0_file_idx = -1;
}
Slice smallest_user_key = ExtractUserKey(f.smallest_key);
Slice largest_user_key = ExtractUserKey(f.largest_key);
if (!RangeMightExistAfterSortedRun(smallest_user_key, largest_user_key,
static_cast<int>(level),
l0_file_idx)) {
bottommost_files_.emplace_back(static_cast<int>(level),
f.file_metadata);
}
}
}
}
void VersionStorageInfo::GenerateFileLocationIndex() {
size_t num_files = 0;
for (int level = 0; level < num_levels_; ++level) {
num_files += files_[level].size();
}
file_locations_.reserve(num_files);
for (int level = 0; level < num_levels_; ++level) {
for (size_t pos = 0; pos < files_[level].size(); ++pos) {
const FileMetaData* const meta = files_[level][pos];
assert(meta);
const uint64_t file_number = meta->fd.GetNumber();
assert(file_locations_.find(file_number) == file_locations_.end());
file_locations_.emplace(file_number, FileLocation(level, pos));
}
}
}
void VersionStorageInfo::UpdateOldestSnapshot(
SequenceNumber seqnum, bool allow_ingest_behind, const Comparator* ucmp,
const std::string& full_history_ts_low) {
assert(seqnum >= oldest_snapshot_seqnum_);
oldest_snapshot_seqnum_ = seqnum;
if (oldest_snapshot_seqnum_ > bottommost_files_mark_threshold_) {
ComputeBottommostFilesMarkedForCompaction(allow_ingest_behind, ucmp,
full_history_ts_low);
}
}
void VersionStorageInfo::ComputeBottommostFilesMarkedForCompaction(
bool allow_ingest_behind, const Comparator* ucmp,
const std::string& full_history_ts_low) {
bottommost_files_marked_for_compaction_.clear();
bottommost_files_mark_threshold_ = kMaxSequenceNumber;
if (allow_ingest_behind) {
return;
}
int64_t creation_time_ub = 0;
bool needs_delay = bottommost_file_compaction_delay_ > 0;
if (needs_delay) {
int64_t current_time = 0;
clock_->GetCurrentTime(¤t_time).PermitUncheckedError();
creation_time_ub =
current_time - static_cast<int64_t>(bottommost_file_compaction_delay_);
}
const bool has_udt = ucmp && ucmp->timestamp_size() > 0;
for (auto& level_and_file : bottommost_files_) {
if (!level_and_file.second->being_compacted &&
level_and_file.second->fd.largest_seqno != 0) {
if (level_and_file.second->fd.largest_seqno < oldest_snapshot_seqnum_) {
if (has_udt) {
const std::string& max_ts = level_and_file.second->max_timestamp;
if (!max_ts.empty()) {
if (full_history_ts_low.empty()) {
continue;
}
if (ucmp->CompareTimestamp(Slice(max_ts), full_history_ts_low) >=
0) {
continue;
}
}
}
if (!needs_delay) {
bottommost_files_marked_for_compaction_.push_back(level_and_file);
} else if (creation_time_ub > 0) {
int64_t creation_time = static_cast<int64_t>(
level_and_file.second->TryGetFileCreationTime());
if (creation_time == kUnknownFileCreationTime ||
creation_time <= creation_time_ub) {
bottommost_files_marked_for_compaction_.push_back(level_and_file);
} else {
}
} else {
}
} else {
bottommost_files_mark_threshold_ =
std::min(bottommost_files_mark_threshold_,
level_and_file.second->fd.largest_seqno);
}
}
}
}
void Version::Ref() { ++refs_; }
bool Version::Unref() {
assert(refs_ >= 1);
--refs_;
if (refs_ == 0) {
delete this;
return true;
}
return false;
}
bool VersionStorageInfo::OverlapInLevel(int level,
const Slice* smallest_user_key,
const Slice* largest_user_key) {
if (level >= num_non_empty_levels_) {
return false;
}
return SomeFileOverlapsRange(*internal_comparator_, (level > 0),
level_files_brief_[level], smallest_user_key,
largest_user_key);
}
void VersionStorageInfo::GetOverlappingInputs(
int level, const InternalKey* begin, const InternalKey* end,
std::vector<FileMetaData*>* inputs, int hint_index, int* file_index,
bool expand_range, const FileMetaData* starting_l0_file,
InternalKey** next_smallest) const {
if (level >= num_non_empty_levels_) {
return;
}
inputs->clear();
if (file_index) {
*file_index = -1;
}
const Comparator* user_cmp = user_comparator_;
if (level > 0) {
GetOverlappingInputsRangeBinarySearch(level, begin, end, inputs, hint_index,
file_index, false, next_smallest);
return;
}
if (next_smallest) {
*next_smallest = nullptr;
}
Slice user_begin, user_end;
if (begin != nullptr) {
user_begin = begin->user_key();
}
if (end != nullptr) {
user_end = end->user_key();
}
std::list<size_t> index;
size_t start_index = 0;
if (starting_l0_file != nullptr) {
uint64_t starting_file_number = starting_l0_file->fd.GetNumber();
for (size_t i = 0; i < level_files_brief_[level].num_files; i++) {
if (level_files_brief_[level].files[i].fd.GetNumber() ==
starting_file_number) {
start_index = i;
break;
}
}
assert(start_index < level_files_brief_[level].num_files);
}
for (size_t i = start_index; i < level_files_brief_[level].num_files; i++) {
index.emplace_back(i);
}
while (!index.empty()) {
bool found_overlapping_file = false;
auto iter = index.begin();
while (iter != index.end()) {
FdWithKeyRange* f = &(level_files_brief_[level].files[*iter]);
const Slice file_start = ExtractUserKey(f->smallest_key);
const Slice file_limit = ExtractUserKey(f->largest_key);
if (begin != nullptr &&
user_cmp->CompareWithoutTimestamp(file_limit, user_begin) < 0) {
iter++;
} else if (end != nullptr &&
user_cmp->CompareWithoutTimestamp(file_start, user_end) > 0) {
iter++;
} else {
inputs->emplace_back(files_[level][*iter]);
found_overlapping_file = true;
if (file_index && *file_index == -1) {
*file_index = static_cast<int>(*iter);
}
iter = index.erase(iter);
if (expand_range) {
if (begin != nullptr &&
user_cmp->CompareWithoutTimestamp(file_start, user_begin) < 0) {
user_begin = file_start;
}
if (end != nullptr &&
user_cmp->CompareWithoutTimestamp(file_limit, user_end) > 0) {
user_end = file_limit;
}
}
}
}
if (!found_overlapping_file) {
break;
}
}
}
void VersionStorageInfo::GetCleanInputsWithinInterval(
int level, const InternalKey* begin, const InternalKey* end,
std::vector<FileMetaData*>* inputs, int hint_index, int* file_index) const {
inputs->clear();
if (file_index) {
*file_index = -1;
}
if (level >= num_non_empty_levels_ || level == 0 ||
level_files_brief_[level].num_files == 0) {
return;
}
GetOverlappingInputsRangeBinarySearch(level, begin, end, inputs, hint_index,
file_index, true );
}
void VersionStorageInfo::GetOverlappingInputsRangeBinarySearch(
int level, const InternalKey* begin, const InternalKey* end,
std::vector<FileMetaData*>* inputs, int hint_index, int* file_index,
bool within_interval, InternalKey** next_smallest) const {
assert(level > 0);
auto user_cmp = user_comparator_;
const FdWithKeyRange* files = level_files_brief_[level].files;
const int num_files = static_cast<int>(level_files_brief_[level].num_files);
int start_index = 0;
int end_index = num_files;
if (begin != nullptr) {
auto cmp = [&user_cmp, &within_interval](const FdWithKeyRange& f,
const InternalKey* k) {
auto& file_key = within_interval ? f.file_metadata->smallest
: f.file_metadata->largest;
return sstableKeyCompare(user_cmp, file_key, *k) < 0;
};
start_index = static_cast<int>(
std::lower_bound(files,
files + (hint_index == -1 ? num_files : hint_index),
begin, cmp) -
files);
if (start_index > 0 && within_interval) {
bool is_overlapping = true;
while (is_overlapping && start_index < num_files) {
auto& pre_limit = files[start_index - 1].file_metadata->largest;
auto& cur_start = files[start_index].file_metadata->smallest;
is_overlapping = sstableKeyCompare(user_cmp, pre_limit, cur_start) == 0;
start_index += is_overlapping;
}
}
}
if (end != nullptr) {
auto cmp = [&user_cmp, &within_interval](const InternalKey* k,
const FdWithKeyRange& f) {
auto& file_key = within_interval ? f.file_metadata->largest
: f.file_metadata->smallest;
return sstableKeyCompare(user_cmp, *k, file_key) < 0;
};
end_index = static_cast<int>(
std::upper_bound(files + start_index, files + num_files, end, cmp) -
files);
if (end_index < num_files && within_interval) {
bool is_overlapping = true;
while (is_overlapping && end_index > start_index) {
auto& next_start = files[end_index].file_metadata->smallest;
auto& cur_limit = files[end_index - 1].file_metadata->largest;
is_overlapping =
sstableKeyCompare(user_cmp, cur_limit, next_start) == 0;
end_index -= is_overlapping;
}
}
}
assert(start_index <= end_index);
if (start_index == end_index) {
if (next_smallest) {
*next_smallest = nullptr;
}
return;
}
assert(start_index < end_index);
if (file_index) {
*file_index = start_index;
}
for (int i = start_index; i < end_index; i++) {
inputs->push_back(files_[level][i]);
}
if (next_smallest != nullptr) {
if (end_index < static_cast<int>(files_[level].size())) {
**next_smallest = files_[level][end_index]->smallest;
} else {
*next_smallest = nullptr;
}
}
}
uint64_t VersionStorageInfo::NumLevelBytes(int level) const {
assert(level >= 0);
assert(level < num_levels());
return TotalFileSize(files_[level]);
}
const char* VersionStorageInfo::LevelSummary(
LevelSummaryStorage* scratch) const {
int len = 0;
if (compaction_style_ == kCompactionStyleLevel && num_levels() > 1) {
assert(base_level_ < static_cast<int>(level_max_bytes_.size()));
if (level_multiplier_ != 0.0) {
len = snprintf(
scratch->buffer, sizeof(scratch->buffer),
"base level %d level multiplier %.2f max bytes base %" PRIu64 " ",
base_level_, level_multiplier_, level_max_bytes_[base_level_]);
}
}
len +=
snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, "files[");
for (int i = 0; i < num_levels(); i++) {
int sz = sizeof(scratch->buffer) - len;
int ret = snprintf(scratch->buffer + len, sz, "%d ", int(files_[i].size()));
if (ret < 0 || ret >= sz) {
break;
}
len += ret;
}
if (len > 0) {
--len;
}
len +=
snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len,
"] max score %.2f, estimated pending compaction bytes %" PRIu64,
compaction_score_[0], estimated_compaction_needed_bytes_);
if (!files_marked_for_compaction_.empty()) {
snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len,
" (%" ROCKSDB_PRIszt " files need compaction)",
files_marked_for_compaction_.size());
}
return scratch->buffer;
}
const char* VersionStorageInfo::LevelFileSummary(FileSummaryStorage* scratch,
int level) const {
int len = snprintf(scratch->buffer, sizeof(scratch->buffer), "files_size[");
for (const auto& f : files_[level]) {
int sz = sizeof(scratch->buffer) - len;
char sztxt[16];
AppendHumanBytes(f->fd.GetFileSize(), sztxt, sizeof(sztxt));
int ret = snprintf(scratch->buffer + len, sz,
"#%" PRIu64 "(seq=%" PRIu64 ",sz=%s,%d) ",
f->fd.GetNumber(), f->fd.smallest_seqno, sztxt,
static_cast<int>(f->being_compacted));
if (ret < 0 || ret >= sz) {
break;
}
len += ret;
}
if (files_[level].size() && len > 0) {
--len;
}
snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, "]");
return scratch->buffer;
}
bool VersionStorageInfo::HasMissingEpochNumber() const {
for (int level = 0; level < num_levels_; ++level) {
for (const FileMetaData* f : files_[level]) {
if (f->epoch_number == kUnknownEpochNumber) {
return true;
}
}
}
return false;
}
uint64_t VersionStorageInfo::GetMaxEpochNumberOfFiles() const {
uint64_t max_epoch_number = kUnknownEpochNumber;
for (int level = 0; level < num_levels_; ++level) {
for (const FileMetaData* f : files_[level]) {
max_epoch_number = std::max(max_epoch_number, f->epoch_number);
}
}
return max_epoch_number;
}
void VersionStorageInfo::RecoverEpochNumbers(ColumnFamilyData* cfd,
bool restart_epoch, bool force) {
if (restart_epoch) {
cfd->ResetNextEpochNumber();
bool reserve_epoch_num_for_file_ingested_behind = cfd->AllowIngestBehind();
if (reserve_epoch_num_for_file_ingested_behind) {
uint64_t reserved_epoch_number = cfd->NewEpochNumber();
assert(reserved_epoch_number ==
kReservedEpochNumberForFileIngestedBehind);
ROCKS_LOG_INFO(cfd->ioptions().info_log.get(),
"[%s]CF has reserved epoch number %" PRIu64
" for files ingested "
"behind since `Options::allow_ingest_behind` or "
"`Options::cf_allow_ingest_behind` is true",
cfd->GetName().c_str(), reserved_epoch_number);
}
}
bool missing_epoch_number = HasMissingEpochNumber();
if (missing_epoch_number || force) {
for (int level = num_levels_ - 1; level >= 1; --level) {
auto& files_at_level = files_[level];
if (files_at_level.empty()) {
continue;
}
uint64_t next_epoch_number = cfd->NewEpochNumber();
for (FileMetaData* f : files_at_level) {
f->epoch_number = next_epoch_number;
}
}
for (auto file_meta_iter = files_[0].rbegin();
file_meta_iter != files_[0].rend(); file_meta_iter++) {
FileMetaData* f = *file_meta_iter;
f->epoch_number = cfd->NewEpochNumber();
}
if (missing_epoch_number) {
assert(epoch_number_requirement_ ==
EpochNumberRequirement::kMightMissing);
ROCKS_LOG_WARN(cfd->ioptions().info_log.get(),
"[%s]CF's epoch numbers are inferred based on seqno",
cfd->GetName().c_str());
epoch_number_requirement_ = EpochNumberRequirement::kMustPresent;
}
} else {
assert(epoch_number_requirement_ == EpochNumberRequirement::kMustPresent);
cfd->SetNextEpochNumber(
std::max(GetMaxEpochNumberOfFiles() + 1, cfd->GetNextEpochNumber()));
}
}
uint64_t VersionStorageInfo::MaxNextLevelOverlappingBytes() {
uint64_t result = 0;
std::vector<FileMetaData*> overlaps;
for (int level = 1; level < num_levels() - 1; level++) {
for (const auto& f : files_[level]) {
GetOverlappingInputs(level + 1, &f->smallest, &f->largest, &overlaps);
const uint64_t sum = TotalFileSize(overlaps);
if (sum > result) {
result = sum;
}
}
}
return result;
}
uint64_t VersionStorageInfo::MaxBytesForLevel(int level) const {
assert(level >= 0);
assert(level < static_cast<int>(level_max_bytes_.size()));
return level_max_bytes_[level];
}
void VersionStorageInfo::CalculateBaseBytes(const ImmutableOptions& ioptions,
const MutableCFOptions& options) {
int num_l0_count = static_cast<int>(files_[0].size());
if (compaction_style_ == kCompactionStyleUniversal) {
for (int i = 1; i < num_levels(); i++) {
if (!files_[i].empty()) {
num_l0_count++;
}
}
}
set_l0_delay_trigger_count(num_l0_count);
level_max_bytes_.resize(ioptions.num_levels);
if (!ioptions.level_compaction_dynamic_level_bytes) {
base_level_ = (ioptions.compaction_style == kCompactionStyleLevel) ? 1 : -1;
for (int i = 0; i < ioptions.num_levels; ++i) {
if (i == 0 && ioptions.compaction_style == kCompactionStyleUniversal) {
level_max_bytes_[i] = options.max_bytes_for_level_base;
} else if (i > 1) {
level_max_bytes_[i] = MultiplyCheckOverflow(
MultiplyCheckOverflow(level_max_bytes_[i - 1],
options.max_bytes_for_level_multiplier),
options.MaxBytesMultiplerAdditional(i - 1));
} else {
level_max_bytes_[i] = options.max_bytes_for_level_base;
}
}
} else {
assert(ioptions.compaction_style == kCompactionStyleLevel);
uint64_t max_level_size = 0;
int first_non_empty_level = -1;
for (int i = 1; i < num_levels_; i++) {
uint64_t total_size = 0;
for (const auto& f : files_[i]) {
total_size += f->fd.GetFileSize();
}
if (total_size > 0 && first_non_empty_level == -1) {
first_non_empty_level = i;
}
if (total_size > max_level_size) {
max_level_size = total_size;
}
}
for (int i = 0; i < num_levels_; i++) {
level_max_bytes_[i] = std::numeric_limits<uint64_t>::max();
}
lowest_unnecessary_level_ = -1;
if (max_level_size == 0) {
base_level_ = num_levels_ - 1;
} else {
assert(first_non_empty_level >= 1);
uint64_t base_bytes_max = options.max_bytes_for_level_base;
uint64_t base_bytes_min = static_cast<uint64_t>(
base_bytes_max / options.max_bytes_for_level_multiplier);
uint64_t cur_level_size = max_level_size;
for (int i = num_levels_ - 2; i >= first_non_empty_level; i--) {
cur_level_size = static_cast<uint64_t>(
cur_level_size / options.max_bytes_for_level_multiplier);
if (lowest_unnecessary_level_ == -1 &&
cur_level_size <= base_bytes_min &&
(options.preclude_last_level_data_seconds == 0 ||
i < num_levels_ - 2)) {
lowest_unnecessary_level_ = i;
}
}
uint64_t base_level_size;
if (cur_level_size <= base_bytes_min) {
assert(first_non_empty_level == num_levels_ - 1 ||
options.preclude_last_level_data_seconds > 0 ||
lowest_unnecessary_level_ != -1);
base_level_size = base_bytes_min + 1U;
base_level_ = first_non_empty_level;
if (base_level_ < num_levels_ - 1) {
ROCKS_LOG_INFO(
ioptions.logger,
"More existing levels in DB than needed: all non-zero "
"levels <= level %d are unnecessary. "
"max_bytes_for_level_multiplier may not be guaranteed.",
lowest_unnecessary_level_);
}
} else {
assert(lowest_unnecessary_level_ == -1);
base_level_ = first_non_empty_level;
while (base_level_ > 1 && cur_level_size > base_bytes_max) {
--base_level_;
cur_level_size = static_cast<uint64_t>(
cur_level_size / options.max_bytes_for_level_multiplier);
}
if (cur_level_size > base_bytes_max) {
assert(base_level_ == 1);
base_level_size = base_bytes_max;
} else {
base_level_size = std::max(static_cast<uint64_t>(1), cur_level_size);
}
}
level_multiplier_ = options.max_bytes_for_level_multiplier;
assert(base_level_size > 0);
uint64_t level_size = base_level_size;
for (int i = base_level_; i < num_levels_; i++) {
if (i > base_level_) {
level_size = MultiplyCheckOverflow(level_size, level_multiplier_);
}
level_max_bytes_[i] = std::max(level_size, base_bytes_max);
}
}
}
}
uint64_t VersionStorageInfo::EstimateLiveDataSize() const {
uint64_t size = 0;
auto ikey_lt = [this](InternalKey* x, InternalKey* y) {
return internal_comparator_->Compare(*x, *y) < 0;
};
std::map<InternalKey*, FileMetaData*, decltype(ikey_lt)> ranges(ikey_lt);
for (int l = num_levels_ - 1; l >= 0; l--) {
bool found_end = false;
for (auto file : files_[l]) {
auto lb = (found_end && l != 0) ? ranges.end()
: ranges.lower_bound(&file->smallest);
found_end = (lb == ranges.end());
if (found_end || internal_comparator_->Compare(
file->largest, (*lb).second->smallest) < 0) {
ranges.emplace_hint(lb, &file->largest, file);
size += file->fd.file_size;
}
}
}
for (const auto& meta : blob_files_) {
assert(meta);
size += meta->GetTotalBlobBytes();
size -= meta->GetGarbageBlobBytes();
}
return size;
}
bool VersionStorageInfo::RangeMightExistAfterSortedRun(
const Slice& smallest_user_key, const Slice& largest_user_key,
int last_level, int last_l0_idx) {
assert((last_l0_idx != -1) == (last_level == 0));
if (last_level == 0 &&
last_l0_idx != static_cast<int>(LevelFiles(0).size() - 1)) {
return true;
}
for (int level = last_level + 1; level < num_levels(); level++) {
if (files_[level].size() > 0 &&
(last_level == 0 ||
OverlapInLevel(level, &smallest_user_key, &largest_user_key))) {
return true;
}
}
return false;
}
Env::WriteLifeTimeHint VersionStorageInfo::CalculateSSTWriteHint(
int level, CompactionStyleSet compaction_style_set) const {
if (!compaction_style_set.Contains(compaction_style_)) {
return Env::WLTH_NOT_SET;
}
switch (compaction_style_) {
case kCompactionStyleLevel:
if (level == 0) {
return Env::WLTH_MEDIUM;
}
if (level - base_level_ >= 2) {
return Env::WLTH_EXTREME;
} else if (level < base_level_) {
return Env::WLTH_MEDIUM;
}
return static_cast<Env::WriteLifeTimeHint>(
level - base_level_ + static_cast<int>(Env::WLTH_MEDIUM));
case kCompactionStyleUniversal:
if (level == 0) {
return Env::WLTH_SHORT;
}
if (level == 1) {
return Env::WLTH_MEDIUM;
}
return Env::WLTH_LONG;
default:
return Env::WLTH_NOT_SET;
}
}
void Version::AddLiveFiles(std::vector<uint64_t>* live_table_files,
std::vector<uint64_t>* live_blob_files) const {
assert(live_table_files);
assert(live_blob_files);
for (int level = 0; level < storage_info_.num_levels(); ++level) {
const auto& level_files = storage_info_.LevelFiles(level);
for (const auto& meta : level_files) {
assert(meta);
live_table_files->emplace_back(meta->fd.GetNumber());
}
}
const auto& blob_files = storage_info_.GetBlobFiles();
for (const auto& meta : blob_files) {
assert(meta);
live_blob_files->emplace_back(meta->GetBlobFileNumber());
}
}
void Version::RemoveLiveFiles(
std::vector<ObsoleteFileInfo>& sst_delete_candidates,
std::vector<ObsoleteBlobFileInfo>& blob_delete_candidates) const {
for (ObsoleteFileInfo& fi : sst_delete_candidates) {
if (!fi.only_delete_metadata &&
storage_info()->GetFileLocation(fi.metadata->fd.GetNumber()) !=
VersionStorageInfo::FileLocation::Invalid()) {
fi.only_delete_metadata = true;
}
}
blob_delete_candidates.erase(
std::remove_if(
blob_delete_candidates.begin(), blob_delete_candidates.end(),
[this](ObsoleteBlobFileInfo& x) {
return storage_info()->GetBlobFileMetaData(x.GetBlobFileNumber());
}),
blob_delete_candidates.end());
}
std::string Version::DebugString(bool hex, bool print_stats) const {
std::string r;
for (int level = 0; level < storage_info_.num_levels_; level++) {
r.append("--- level ");
AppendNumberTo(&r, level);
r.append(" --- version# ");
AppendNumberTo(&r, version_number_);
if (storage_info_.compact_cursor_[level].Valid()) {
r.append(" --- compact_cursor: ");
r.append(storage_info_.compact_cursor_[level].DebugString(hex));
}
r.append(" ---\n");
const std::vector<FileMetaData*>& files = storage_info_.files_[level];
for (size_t i = 0; i < files.size(); i++) {
r.push_back(' ');
AppendNumberTo(&r, files[i]->fd.GetNumber());
r.push_back(':');
AppendNumberTo(&r, files[i]->fd.GetFileSize());
r.append("[");
AppendNumberTo(&r, files[i]->fd.smallest_seqno);
r.append(" .. ");
AppendNumberTo(&r, files[i]->fd.largest_seqno);
r.append("]");
r.append("[");
r.append(files[i]->smallest.DebugString(hex));
r.append(" .. ");
r.append(files[i]->largest.DebugString(hex));
r.append("]");
if (files[i]->oldest_blob_file_number != kInvalidBlobFileNumber) {
r.append(" blob_file:");
AppendNumberTo(&r, files[i]->oldest_blob_file_number);
}
if (print_stats) {
r.append("(");
r.append(std::to_string(
files[i]->stats.num_reads_sampled.load(std::memory_order_relaxed)));
r.append(")");
}
r.append("\n");
}
}
const auto& blob_files = storage_info_.GetBlobFiles();
if (!blob_files.empty()) {
r.append("--- blob files --- version# ");
AppendNumberTo(&r, version_number_);
r.append(" ---\n");
for (const auto& blob_file_meta : blob_files) {
assert(blob_file_meta);
r.append(blob_file_meta->DebugString());
r.push_back('\n');
}
}
return r;
}
struct VersionSet::ManifestWriter {
Status status;
bool done;
InstrumentedCondVar cv;
ColumnFamilyData* cfd;
const autovector<VersionEdit*>& edit_list;
const std::function<void(const Status&)> manifest_write_callback;
explicit ManifestWriter(
InstrumentedMutex* mu, ColumnFamilyData* _cfd,
const autovector<VersionEdit*>& e,
const std::function<void(const Status&)>& manifest_wcb)
: done(false),
cv(mu),
cfd(_cfd),
edit_list(e),
manifest_write_callback(manifest_wcb) {}
~ManifestWriter() { status.PermitUncheckedError(); }
bool IsAllWalEdits() const {
bool all_wal_edits = true;
for (const auto& e : edit_list) {
if (!e->IsWalManipulation()) {
all_wal_edits = false;
break;
}
}
return all_wal_edits;
}
};
Status AtomicGroupReadBuffer::AddEdit(VersionEdit* edit) {
assert(edit);
if (edit->IsInAtomicGroup()) {
TEST_SYNC_POINT("AtomicGroupReadBuffer::AddEdit:AtomicGroup");
if (replay_buffer_.empty()) {
replay_buffer_.resize(edit->GetRemainingEntries() + 1);
TEST_SYNC_POINT_CALLBACK(
"AtomicGroupReadBuffer::AddEdit:FirstInAtomicGroup", edit);
}
read_edits_in_atomic_group_++;
if (read_edits_in_atomic_group_ + edit->GetRemainingEntries() !=
static_cast<uint32_t>(replay_buffer_.size())) {
TEST_SYNC_POINT_CALLBACK(
"AtomicGroupReadBuffer::AddEdit:IncorrectAtomicGroupSize", edit);
return Status::Corruption("corrupted atomic group");
}
replay_buffer_[read_edits_in_atomic_group_ - 1] = *edit;
if (read_edits_in_atomic_group_ == replay_buffer_.size()) {
TEST_SYNC_POINT_CALLBACK(
"AtomicGroupReadBuffer::AddEdit:LastInAtomicGroup", edit);
return Status::OK();
}
return Status::OK();
}
if (!replay_buffer().empty()) {
TEST_SYNC_POINT_CALLBACK(
"AtomicGroupReadBuffer::AddEdit:AtomicGroupMixedWithNormalEdits", edit);
return Status::Corruption("corrupted atomic group");
}
return Status::OK();
}
bool AtomicGroupReadBuffer::IsFull() const {
return read_edits_in_atomic_group_ == replay_buffer_.size();
}
bool AtomicGroupReadBuffer::IsEmpty() const { return replay_buffer_.empty(); }
void AtomicGroupReadBuffer::Clear() {
read_edits_in_atomic_group_ = 0;
replay_buffer_.clear();
}
VersionSet::VersionSet(
const std::string& dbname, const ImmutableDBOptions* _db_options,
const MutableDBOptions& mutable_db_options,
const FileOptions& storage_options, Cache* table_cache,
WriteBufferManager* write_buffer_manager, WriteController* write_controller,
BlockCacheTracer* const block_cache_tracer,
const std::shared_ptr<IOTracer>& io_tracer, const std::string& db_id,
const std::string& db_session_id, const std::string& daily_offpeak_time_utc,
ErrorHandler* error_handler, bool unchanging)
: column_family_set_(new ColumnFamilySet(
dbname, _db_options, storage_options, table_cache,
write_buffer_manager, write_controller, block_cache_tracer, io_tracer,
db_id, db_session_id)),
table_cache_(table_cache),
env_(_db_options->env),
fs_(_db_options->fs, io_tracer),
clock_(_db_options->clock),
dbname_(dbname),
db_id_(db_id),
db_options_(_db_options),
next_file_number_(2),
manifest_file_number_(0), options_file_number_(0),
options_file_size_(0),
pending_manifest_file_number_(0),
last_sequence_(0),
last_allocated_sequence_(0),
last_published_sequence_(0),
prev_log_number_(0),
current_version_number_(0),
manifest_file_size_(0),
last_compacted_manifest_file_size_(0),
file_options_(storage_options),
block_cache_tracer_(block_cache_tracer),
io_tracer_(io_tracer),
db_session_id_(db_session_id),
offpeak_time_option_(OffpeakTimeOption(daily_offpeak_time_utc)),
error_handler_(error_handler),
unchanging_(unchanging),
closed_(false) {
UpdatedMutableDbOptions(mutable_db_options, nullptr);
}
Status VersionSet::Close(FSDirectory* db_dir, InstrumentedMutex* mu) {
Status s;
if (closed_ || unchanging_ || !manifest_file_number_ || !descriptor_log_) {
return s;
}
std::string manifest_file_name =
DescriptorFileName(dbname_, manifest_file_number_);
uint64_t size = 0;
IOStatus io_s = descriptor_log_->Close(WriteOptions());
descriptor_log_.reset();
TEST_SYNC_POINT("VersionSet::Close:AfterClose");
if (io_s.ok()) {
io_s = fs_->GetFileSize(manifest_file_name, IOOptions(), &size, nullptr);
}
if (!io_s.ok() || size != manifest_file_size_) {
if (io_s.ok()) {
io_s = IOStatus::Corruption();
}
ColumnFamilyData* cfd = GetColumnFamilySet()->GetDefault();
IOErrorInfo io_error_info(io_s, FileOperationType::kVerify,
manifest_file_name, size,
0);
for (auto& listener : cfd->ioptions().listeners) {
listener->OnIOError(io_error_info);
}
io_s.PermitUncheckedError();
io_error_info.io_status.PermitUncheckedError();
ROCKS_LOG_ERROR(db_options_->info_log,
"MANIFEST verification on Close, "
"filename %s, expected size %" PRIu64
" failed with status %s and "
"actual size %" PRIu64 "\n",
manifest_file_name.c_str(), manifest_file_size_,
io_s.ToString().c_str(), size);
VersionEdit edit;
assert(cfd);
s = LogAndApply(cfd, ReadOptions(), WriteOptions(), &edit, mu, db_dir);
}
closed_ = true;
return s;
}
VersionSet::~VersionSet() {
column_family_set_.reset();
for (auto& file : obsolete_files_) {
TableCache::ReleaseObsolete(table_cache_, file.metadata->fd.GetNumber(),
file.metadata->table_reader_handle,
0);
file.DeleteMetadata();
}
obsolete_files_.clear();
io_status_.PermitUncheckedError();
}
void VersionSet::Reset() {
if (column_family_set_) {
WriteBufferManager* wbm = column_family_set_->write_buffer_manager();
WriteController* wc = column_family_set_->write_controller();
if (table_cache_) {
table_cache_->EraseUnRefEntries();
}
column_family_set_.reset(new ColumnFamilySet(
dbname_, db_options_, file_options_, table_cache_, wbm, wc,
block_cache_tracer_, io_tracer_, db_id_, db_session_id_));
}
db_id_.clear();
next_file_number_.store(2);
min_log_number_to_keep_.store(0);
manifest_file_number_ = 0;
options_file_number_ = 0;
pending_manifest_file_number_ = 0;
last_sequence_.store(0);
last_allocated_sequence_.store(0);
last_published_sequence_.store(0);
prev_log_number_ = 0;
descriptor_log_.reset();
current_version_number_ = 0;
manifest_writers_.clear();
manifest_file_size_ = 0;
last_compacted_manifest_file_size_ = 0;
TuneMaxManifestFileSize();
obsolete_files_.clear();
obsolete_manifests_.clear();
wals_.Reset();
}
void VersionSet::UpdatedMutableDbOptions(
const MutableDBOptions& updated_options, InstrumentedMutex* mu) {
if (mu) {
mu->AssertHeld();
} else {
assert(manifest_file_size_ == 0);
}
file_options_.writable_file_max_buffer_size =
updated_options.writable_file_max_buffer_size;
min_max_manifest_file_size_ = updated_options.max_manifest_file_size;
max_manifest_space_amp_pct_ = static_cast<unsigned>(
std::max(updated_options.max_manifest_space_amp_pct, 0));
manifest_preallocation_size_ = updated_options.manifest_preallocation_size;
TuneMaxManifestFileSize();
}
void VersionSet::TuneMaxManifestFileSize() {
tuned_max_manifest_file_size_ =
std::max(min_max_manifest_file_size_,
last_compacted_manifest_file_size_ *
(100U + max_manifest_space_amp_pct_) / 100U);
}
void VersionSet::AppendVersion(ColumnFamilyData* column_family_data,
Version* v) {
v->storage_info()->ComputeCompactionScore(
column_family_data->ioptions(),
column_family_data->GetLatestMutableCFOptions(),
column_family_data->GetFullHistoryTsLow());
v->storage_info_.SetFinalized();
assert(v->refs_ == 0);
Version* current = column_family_data->current();
assert(v != current);
if (current != nullptr) {
assert(current->refs_ > 0);
current->Unref();
}
column_family_data->SetCurrent(v);
v->Ref();
v->prev_ = column_family_data->dummy_versions()->prev_;
v->next_ = column_family_data->dummy_versions();
v->prev_->next_ = v;
v->next_->prev_ = v;
}
Status VersionSet::ProcessManifestWrites(
std::deque<ManifestWriter>& writers, InstrumentedMutex* mu,
FSDirectory* dir_contains_current_file, bool new_descriptor_log,
const ColumnFamilyOptions* new_cf_options, const ReadOptions& read_options,
const WriteOptions& write_options) {
mu->AssertHeld();
assert(!writers.empty());
ManifestWriter& first_writer = writers.front();
ManifestWriter* last_writer = &first_writer;
assert(!manifest_writers_.empty());
assert(manifest_writers_.front() == &first_writer);
autovector<VersionEdit*> batch_edits;
autovector<std::optional<size_t>> batch_edits_ts_sz;
autovector<Version*> versions;
std::vector<std::unique_ptr<BaseReferencedVersionBuilder>> builder_guards;
autovector<const autovector<uint64_t>*> files_to_quarantine_if_commit_fail;
autovector<uint64_t> limbo_descriptor_log_file_number;
SequenceNumber max_last_sequence = descriptor_last_sequence_;
bool skip_manifest_write =
first_writer.edit_list.front()->IsNoManifestWriteDummy();
if (first_writer.edit_list.front()->IsColumnFamilyManipulation()) {
LogAndApplyCFHelper(first_writer.edit_list.front(), &max_last_sequence);
batch_edits.push_back(first_writer.edit_list.front());
batch_edits_ts_sz.push_back(std::nullopt);
} else {
auto it = manifest_writers_.cbegin();
size_t group_start = std::numeric_limits<size_t>::max();
for (;;) {
assert(!(*it)->edit_list.front()->IsColumnFamilyManipulation());
last_writer = *it;
assert(last_writer != nullptr);
assert(last_writer->cfd != nullptr);
if (last_writer->cfd->IsDropped()) {
if (!batch_edits.empty()) {
if (batch_edits.back()->IsInAtomicGroup() &&
batch_edits.back()->GetRemainingEntries() > 0) {
assert(group_start < batch_edits.size());
const auto& edit_list = last_writer->edit_list;
size_t k = 0;
while (k < edit_list.size()) {
if (!edit_list[k]->IsInAtomicGroup()) {
break;
} else if (edit_list[k]->GetRemainingEntries() == 0) {
++k;
break;
}
++k;
}
for (auto i = group_start; i < batch_edits.size(); ++i) {
assert(static_cast<uint32_t>(k) <=
batch_edits.back()->GetRemainingEntries());
batch_edits[i]->SetRemainingEntries(
batch_edits[i]->GetRemainingEntries() -
static_cast<uint32_t>(k));
}
}
}
} else {
Version* version = nullptr;
VersionBuilder* builder = nullptr;
for (int i = 0; i != static_cast<int>(versions.size()); ++i) {
uint32_t cf_id = last_writer->cfd->GetID();
if (versions[i]->cfd()->GetID() == cf_id) {
version = versions[i];
assert(!builder_guards.empty() &&
builder_guards.size() == versions.size());
builder = builder_guards[i]->version_builder();
TEST_SYNC_POINT_CALLBACK(
"VersionSet::ProcessManifestWrites:SameColumnFamily", &cf_id);
break;
}
}
if (version == nullptr) {
if (!last_writer->IsAllWalEdits()) {
version = new Version(
last_writer->cfd, this, file_options_,
last_writer->cfd ? last_writer->cfd->GetLatestMutableCFOptions()
: MutableCFOptions(*new_cf_options),
io_tracer_, current_version_number_++);
versions.push_back(version);
builder_guards.emplace_back(
new BaseReferencedVersionBuilder(last_writer->cfd));
builder = builder_guards.back()->version_builder();
}
assert(last_writer->IsAllWalEdits() || builder);
assert(last_writer->IsAllWalEdits() || version);
TEST_SYNC_POINT_CALLBACK(
"VersionSet::ProcessManifestWrites:NewVersion", version);
}
const Comparator* ucmp = last_writer->cfd->user_comparator();
assert(ucmp);
std::optional<size_t> edit_ts_sz = ucmp->timestamp_size();
for (const auto& e : last_writer->edit_list) {
if (e->IsInAtomicGroup()) {
if (batch_edits.empty() || !batch_edits.back()->IsInAtomicGroup() ||
(batch_edits.back()->IsInAtomicGroup() &&
batch_edits.back()->GetRemainingEntries() == 0)) {
group_start = batch_edits.size();
}
} else if (group_start != std::numeric_limits<size_t>::max()) {
group_start = std::numeric_limits<size_t>::max();
}
Status s = LogAndApplyHelper(last_writer->cfd, builder, e,
&max_last_sequence, mu);
if (!s.ok()) {
for (auto v : versions) {
delete v;
}
return s;
}
batch_edits.push_back(e);
batch_edits_ts_sz.push_back(edit_ts_sz);
}
}
++it;
if (it == manifest_writers_.cend()) {
break;
}
if (skip_manifest_write) {
break;
}
const auto* next = (*it)->edit_list.front();
if (next->IsColumnFamilyManipulation() ||
next->IsNoManifestWriteDummy()) {
break;
}
}
for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
assert(!builder_guards.empty() &&
builder_guards.size() == versions.size());
auto* builder = builder_guards[i]->version_builder();
Status s = builder->SaveTo(versions[i]->storage_info());
if (!s.ok()) {
for (auto v : versions) {
delete v;
}
return s;
}
}
}
#ifndef NDEBUG
size_t k = 0;
while (k < batch_edits.size()) {
while (k < batch_edits.size() && !batch_edits[k]->IsInAtomicGroup()) {
++k;
}
if (k == batch_edits.size()) {
break;
}
size_t i = k;
while (i < batch_edits.size()) {
if (!batch_edits[i]->IsInAtomicGroup()) {
break;
}
assert(i - k + batch_edits[i]->GetRemainingEntries() ==
batch_edits[k]->GetRemainingEntries());
if (batch_edits[i]->GetRemainingEntries() == 0) {
++i;
break;
}
++i;
}
assert(batch_edits[i - 1]->IsInAtomicGroup());
assert(0 == batch_edits[i - 1]->GetRemainingEntries());
std::vector<VersionEdit*> tmp;
for (size_t j = k; j != i; ++j) {
tmp.emplace_back(batch_edits[j]);
}
TEST_SYNC_POINT_CALLBACK(
"VersionSet::ProcessManifestWrites:CheckOneAtomicGroup", &tmp);
k = i;
}
if (skip_manifest_write) {
assert(last_writer == &first_writer);
}
#endif
uint64_t prev_manifest_file_size = manifest_file_size_;
assert(pending_manifest_file_number_ == 0);
if (!skip_manifest_write &&
(!descriptor_log_ ||
prev_manifest_file_size >= tuned_max_manifest_file_size_)) {
TEST_SYNC_POINT("VersionSet::ProcessManifestWrites:BeforeNewManifest");
new_descriptor_log = true;
} else {
pending_manifest_file_number_ = manifest_file_number_;
}
std::unordered_map<uint32_t, MutableCFState> curr_state;
VersionEdit wal_additions;
if (new_descriptor_log) {
pending_manifest_file_number_ = NewFileNumber();
batch_edits.back()->SetNextFile(next_file_number_.load());
if (column_family_set_->GetMaxColumnFamily() > 0) {
first_writer.edit_list.front()->SetMaxColumnFamily(
column_family_set_->GetMaxColumnFamily());
}
for (const auto* cfd : *column_family_set_) {
assert(curr_state.find(cfd->GetID()) == curr_state.end());
curr_state.emplace(
cfd->GetID(),
MutableCFState(cfd->GetLogNumber(), cfd->GetFullHistoryTsLow()));
}
for (const auto& wal : wals_.GetWals()) {
wal_additions.AddWal(wal.first, wal.second);
}
}
uint64_t new_manifest_file_size = 0;
Status s;
IOStatus io_s;
IOStatus manifest_io_status;
manifest_io_status.PermitUncheckedError();
std::unique_ptr<log::Writer> new_desc_log_ptr;
uint64_t manifest_preallocation_size = manifest_preallocation_size_;
if (skip_manifest_write) {
if (s.ok()) {
constexpr bool update_stats = true;
for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
versions[i]->PrepareAppend(read_options, update_stats);
}
}
} else {
FileOptions opt_file_opts = fs_->OptimizeForManifestWrite(file_options_);
if (file_options_.temperature != Temperature::kUnknown) {
opt_file_opts.temperature = file_options_.temperature;
}
mu->Unlock();
TEST_SYNC_POINT("VersionSet::LogAndApply:WriteManifestStart");
TEST_SYNC_POINT_CALLBACK("VersionSet::LogAndApply:WriteManifest", nullptr);
if (!first_writer.edit_list.front()->IsColumnFamilyManipulation()) {
for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
assert(!builder_guards.empty() &&
builder_guards.size() == versions.size());
ColumnFamilyData* cfd = versions[i]->cfd_;
s = builder_guards[i]->version_builder()->LoadTableHandlers(
cfd->internal_stats(), 1 ,
true ,
false , versions[i]->GetMutableCFOptions(),
MaxFileSizeForL0MetaPin(versions[i]->GetMutableCFOptions()),
read_options);
if (!s.ok()) {
if (db_options_->paranoid_checks) {
break;
}
s = Status::OK();
}
}
}
log::Writer* raw_desc_log_ptr = descriptor_log_.get();
if (s.ok() && new_descriptor_log) {
std::string descriptor_fname =
DescriptorFileName(dbname_, pending_manifest_file_number_);
std::unique_ptr<FSWritableFile> descriptor_file;
io_s = NewWritableFile(fs_.get(), descriptor_fname, &descriptor_file,
opt_file_opts);
if (io_s.ok()) {
descriptor_file->SetPreallocationBlockSize(manifest_preallocation_size);
FileTypeSet tmp_set = db_options_->checksum_handoff_file_types;
std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
std::move(descriptor_file), descriptor_fname, opt_file_opts, clock_,
io_tracer_, nullptr, Histograms::HISTOGRAM_ENUM_MAX ,
db_options_->listeners, nullptr,
tmp_set.Contains(FileType::kDescriptorFile),
tmp_set.Contains(FileType::kDescriptorFile)));
new_desc_log_ptr.reset(
new log::Writer(std::move(file_writer), 0, false));
raw_desc_log_ptr = new_desc_log_ptr.get();
s = WriteCurrentStateToManifest(write_options, curr_state,
wal_additions, raw_desc_log_ptr, io_s);
assert(s == io_s);
}
if (!io_s.ok()) {
manifest_io_status = io_s;
s = io_s;
}
}
if (s.ok()) {
if (!first_writer.edit_list.front()->IsColumnFamilyManipulation()) {
constexpr bool update_stats = true;
for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
versions[i]->PrepareAppend(read_options, update_stats);
}
}
#ifndef NDEBUG
size_t idx = 0;
#endif
assert(batch_edits.size() == batch_edits_ts_sz.size());
for (size_t bidx = 0; bidx < batch_edits.size(); bidx++) {
auto& e = batch_edits[bidx];
files_to_quarantine_if_commit_fail.push_back(
e->GetFilesToQuarantineIfCommitFail());
std::string record;
if (!e->EncodeTo(&record, batch_edits_ts_sz[bidx])) {
s = Status::Corruption("Unable to encode VersionEdit:" +
e->DebugString(true));
break;
}
TEST_KILL_RANDOM_WITH_WEIGHT("VersionSet::LogAndApply:BeforeAddRecord",
REDUCE_ODDS2);
#ifndef NDEBUG
if (batch_edits.size() > 1 && batch_edits.size() - 1 == idx) {
TEST_SYNC_POINT_CALLBACK(
"VersionSet::ProcessManifestWrites:BeforeWriteLastVersionEdit:"
"0",
nullptr);
TEST_SYNC_POINT(
"VersionSet::ProcessManifestWrites:BeforeWriteLastVersionEdit:"
"1");
}
++idx;
#endif
io_s = raw_desc_log_ptr->AddRecord(write_options, record);
if (!io_s.ok()) {
s = io_s;
manifest_io_status = io_s;
break;
}
}
if (s.ok()) {
io_s =
SyncManifest(db_options_, write_options, raw_desc_log_ptr->file());
manifest_io_status = io_s;
TEST_SYNC_POINT_CALLBACK(
"VersionSet::ProcessManifestWrites:AfterSyncManifest", &io_s);
}
if (!io_s.ok()) {
s = io_s;
ROCKS_LOG_ERROR(db_options_->info_log, "MANIFEST write %s\n",
s.ToString().c_str());
}
}
if (s.ok()) {
assert(manifest_io_status.ok());
}
if (s.ok() && new_descriptor_log) {
io_s = SetCurrentFile(
write_options, fs_.get(), dbname_, pending_manifest_file_number_,
file_options_.temperature, dir_contains_current_file);
if (!io_s.ok()) {
s = io_s;
limbo_descriptor_log_file_number.push_back(manifest_file_number_);
files_to_quarantine_if_commit_fail.push_back(
&limbo_descriptor_log_file_number);
}
}
if (s.ok()) {
new_manifest_file_size = raw_desc_log_ptr->file()->GetFileSize();
if (new_descriptor_log) {
ROCKS_LOG_INFO(db_options_->info_log,
"Created manifest %" PRIu64
", compacted+appended from %" PRIu64 " to %" PRIu64 "\n",
pending_manifest_file_number_, prev_manifest_file_size,
new_manifest_file_size);
}
}
if (first_writer.edit_list.front()->IsColumnFamilyDrop()) {
TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:0");
TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:1");
TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:2");
}
LogFlush(db_options_->info_log);
TEST_SYNC_POINT("VersionSet::LogAndApply:WriteManifestDone");
mu->Lock();
}
if (s.ok()) {
for (auto& e : batch_edits) {
if (e->IsWalAddition()) {
s = wals_.AddWals(e->GetWalAdditions());
} else if (e->IsWalDeletion()) {
s = wals_.DeleteWalsBefore(e->GetWalDeletion().GetLogNumber());
}
if (!s.ok()) {
break;
}
}
}
if (!io_s.ok()) {
if (io_status_.ok()) {
io_status_ = io_s;
if (error_handler_) {
error_handler_->AddFilesToQuarantine(
files_to_quarantine_if_commit_fail);
}
}
} else if (!io_status_.ok()) {
io_status_ = io_s;
if (error_handler_) {
error_handler_->ClearFilesToQuarantine();
}
}
if (s.ok() && new_descriptor_log) {
descriptor_log_ = std::move(new_desc_log_ptr);
obsolete_manifests_.emplace_back(
DescriptorFileName("", manifest_file_number_));
last_compacted_manifest_file_size_ = new_manifest_file_size;
TuneMaxManifestFileSize();
}
if (s.ok()) {
if (first_writer.edit_list.front()->IsColumnFamilyAdd()) {
assert(batch_edits.size() == 1);
assert(new_cf_options != nullptr);
assert(max_last_sequence == descriptor_last_sequence_);
CreateColumnFamily(*new_cf_options, read_options,
first_writer.edit_list.front(),
false);
} else if (first_writer.edit_list.front()->IsColumnFamilyDrop()) {
assert(batch_edits.size() == 1);
assert(max_last_sequence == descriptor_last_sequence_);
first_writer.cfd->SetDropped();
first_writer.cfd->UnrefAndTryDelete();
} else {
uint64_t last_min_log_number_to_keep = 0;
for (const auto& e : batch_edits) {
ColumnFamilyData* cfd = nullptr;
if (!e->IsColumnFamilyManipulation()) {
cfd = column_family_set_->GetColumnFamily(e->GetColumnFamily());
assert(cfd);
}
if (cfd) {
if (e->HasLogNumber() && e->GetLogNumber() > cfd->GetLogNumber()) {
cfd->SetLogNumber(e->GetLogNumber());
}
if (e->HasFullHistoryTsLow()) {
cfd->SetFullHistoryTsLow(e->GetFullHistoryTsLow());
}
}
if (e->HasMinLogNumberToKeep()) {
last_min_log_number_to_keep =
std::max(last_min_log_number_to_keep, e->GetMinLogNumberToKeep());
}
}
if (last_min_log_number_to_keep != 0) {
MarkMinLogNumberToKeep(last_min_log_number_to_keep);
}
for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
ColumnFamilyData* cfd = versions[i]->cfd_;
AppendVersion(cfd, versions[i]);
}
}
if (!skip_manifest_write) {
assert(max_last_sequence >= descriptor_last_sequence_);
descriptor_last_sequence_ = max_last_sequence;
manifest_file_number_ = pending_manifest_file_number_;
manifest_file_size_ = new_manifest_file_size;
prev_log_number_ = first_writer.edit_list.front()->GetPrevLogNumber();
}
} else {
std::string version_edits;
for (auto& e : batch_edits) {
version_edits += ("\n" + e->DebugString(true));
}
ROCKS_LOG_ERROR(db_options_->info_log,
"Error in committing version edit to MANIFEST: %s",
version_edits.c_str());
for (auto v : versions) {
delete v;
}
descriptor_log_.reset();
new_desc_log_ptr.reset();
if (!manifest_io_status.ok() && new_descriptor_log) {
ROCKS_LOG_INFO(db_options_->info_log,
"Deleting manifest %" PRIu64 " current manifest %" PRIu64
"\n",
pending_manifest_file_number_, manifest_file_number_);
Status manifest_del_status = env_->DeleteFile(
DescriptorFileName(dbname_, pending_manifest_file_number_));
if (!manifest_del_status.ok()) {
ROCKS_LOG_WARN(db_options_->info_log,
"Failed to delete manifest %" PRIu64 ": %s",
pending_manifest_file_number_,
manifest_del_status.ToString().c_str());
}
}
}
pending_manifest_file_number_ = 0;
#ifndef NDEBUG
if (s.ok()) {
for (const auto* v : versions) {
const auto* vstorage = v->storage_info();
for (int level = 0; level < vstorage->num_levels(); ++level) {
for (const auto& file : vstorage->LevelFiles(level)) {
assert(file->fd.largest_seqno <= descriptor_last_sequence_);
}
}
}
}
#endif
while (true) {
ManifestWriter* ready = manifest_writers_.front();
manifest_writers_.pop_front();
bool need_signal = true;
for (const auto& w : writers) {
if (&w == ready) {
need_signal = false;
break;
}
}
ready->status = s;
ready->done = true;
if (ready->manifest_write_callback) {
(ready->manifest_write_callback)(s);
}
if (need_signal) {
ready->cv.Signal();
}
if (ready == last_writer) {
break;
}
}
if (!manifest_writers_.empty()) {
manifest_writers_.front()->cv.Signal();
}
return s;
}
void VersionSet::WakeUpWaitingManifestWriters() {
if (!manifest_writers_.empty()) {
manifest_writers_.front()->cv.Signal();
}
}
Status VersionSet::LogAndApply(
const autovector<ColumnFamilyData*>& column_family_datas,
const ReadOptions& read_options, const WriteOptions& write_options,
const autovector<autovector<VersionEdit*>>& edit_lists,
InstrumentedMutex* mu, FSDirectory* dir_contains_current_file,
bool new_descriptor_log, const ColumnFamilyOptions* new_cf_options,
const std::vector<std::function<void(const Status&)>>& manifest_wcbs,
const std::function<Status()>& pre_cb) {
mu->AssertHeld();
int num_edits = 0;
for (const auto& elist : edit_lists) {
num_edits += static_cast<int>(elist.size());
}
if (num_edits == 0) {
return Status::OK();
} else if (num_edits > 1) {
#ifndef NDEBUG
for (const auto& edit_list : edit_lists) {
for (const auto& edit : edit_list) {
assert(!edit->IsColumnFamilyManipulation());
assert(!edit->IsNoManifestWriteDummy());
}
}
#endif
}
int num_cfds = static_cast<int>(column_family_datas.size());
if (num_cfds == 1 && column_family_datas[0] == nullptr) {
assert(edit_lists.size() == 1 && edit_lists[0].size() == 1);
assert(edit_lists[0][0]->IsColumnFamilyAdd());
assert(new_cf_options != nullptr);
}
std::deque<ManifestWriter> writers;
if (num_cfds > 0) {
assert(static_cast<size_t>(num_cfds) == edit_lists.size());
}
for (int i = 0; i < num_cfds; ++i) {
const auto wcb =
manifest_wcbs.empty() ? [](const Status&) {} : manifest_wcbs[i];
writers.emplace_back(mu, column_family_datas[i], edit_lists[i], wcb);
manifest_writers_.push_back(&writers[i]);
}
assert(!writers.empty());
ManifestWriter& first_writer = writers.front();
TEST_SYNC_POINT_CALLBACK("VersionSet::LogAndApply:BeforeWriterWaiting",
nullptr);
while (!first_writer.done && &first_writer != manifest_writers_.front()) {
first_writer.cv.Wait();
}
if (first_writer.done) {
#ifndef NDEBUG
for (const auto& writer : writers) {
assert(writer.done);
}
TEST_SYNC_POINT_CALLBACK("VersionSet::LogAndApply:WakeUpAndDone", mu);
#endif
return first_writer.status;
}
TEST_SYNC_POINT_CALLBACK("VersionSet::LogAndApply:WakeUpAndNotDone", mu);
int num_undropped_cfds = 0;
for (auto cfd : column_family_datas) {
if (cfd == nullptr || !cfd->IsDropped()) {
++num_undropped_cfds;
}
}
Status s;
if (0 == num_undropped_cfds) {
s = Status::ColumnFamilyDropped();
}
if (s.ok() && pre_cb) {
s = pre_cb();
}
if (!s.ok()) {
for (int i = 0; i != num_cfds; ++i) {
manifest_writers_.pop_front();
}
if (!manifest_writers_.empty()) {
manifest_writers_.front()->cv.Signal();
}
return s;
} else {
return ProcessManifestWrites(writers, mu, dir_contains_current_file,
new_descriptor_log, new_cf_options,
read_options, write_options);
}
}
void VersionSet::LogAndApplyCFHelper(VersionEdit* edit,
SequenceNumber* max_last_sequence) {
assert(max_last_sequence != nullptr);
assert(edit->IsColumnFamilyManipulation());
edit->SetNextFile(next_file_number_.load());
assert(!edit->HasLastSequence());
edit->SetLastSequence(*max_last_sequence);
if (edit->IsColumnFamilyDrop()) {
edit->SetMaxColumnFamily(column_family_set_->GetMaxColumnFamily());
}
}
Status VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd,
VersionBuilder* builder, VersionEdit* edit,
SequenceNumber* max_last_sequence,
InstrumentedMutex* mu) {
#ifdef NDEBUG
(void)cfd;
#endif
mu->AssertHeld();
assert(!edit->IsColumnFamilyManipulation());
assert(max_last_sequence != nullptr);
if (edit->HasLogNumber()) {
assert(edit->GetLogNumber() >= cfd->GetLogNumber());
assert(edit->GetLogNumber() < next_file_number_.load());
}
if (!edit->HasPrevLogNumber()) {
edit->SetPrevLogNumber(prev_log_number_);
}
edit->SetNextFile(next_file_number_.load());
if (edit->HasLastSequence() && edit->GetLastSequence() > *max_last_sequence) {
*max_last_sequence = edit->GetLastSequence();
} else {
edit->SetLastSequence(*max_last_sequence);
}
assert(builder || edit->IsWalManipulation());
return builder ? builder->Apply(edit) : Status::OK();
}
Status VersionSet::Recover(
const std::vector<ColumnFamilyDescriptor>& column_families, bool read_only,
std::string* db_id, bool no_error_if_files_missing, bool is_retry,
Status* log_status) {
const ReadOptions read_options(Env::IOActivity::kDBOpen);
std::string manifest_path;
Status s = GetCurrentManifestPath(dbname_, fs_.get(), is_retry,
&manifest_path, &manifest_file_number_);
if (!s.ok()) {
return s;
}
ROCKS_LOG_INFO(db_options_->info_log, "Recovering from manifest file: %s\n",
manifest_path.c_str());
std::unique_ptr<SequentialFileReader> manifest_file_reader;
{
std::unique_ptr<FSSequentialFile> manifest_file;
s = fs_->NewSequentialFile(manifest_path,
fs_->OptimizeForManifestRead(file_options_),
&manifest_file, nullptr);
if (!s.ok()) {
return s;
}
manifest_file_reader.reset(new SequentialFileReader(
std::move(manifest_file), manifest_path,
db_options_->log_readahead_size, io_tracer_, db_options_->listeners,
nullptr, is_retry));
}
TEST_SYNC_POINT("VersionSet::Recover:StartManifestRead");
uint64_t current_manifest_file_size = 0;
uint64_t log_number = 0;
{
VersionSet::LogReporter reporter;
Status log_read_status;
reporter.status = &log_read_status;
log::Reader reader(nullptr, std::move(manifest_file_reader), &reporter,
true , 0 );
VersionEditHandler handler(
read_only, column_families, const_cast<VersionSet*>(this),
false, no_error_if_files_missing,
io_tracer_, read_options, false,
EpochNumberRequirement::kMightMissing);
handler.Iterate(reader, &log_read_status);
s = handler.status();
if (s.ok()) {
log_number = handler.GetVersionEditParams().GetLogNumber();
current_manifest_file_size = reader.GetReadOffset();
assert(current_manifest_file_size != 0);
handler.GetDbId(db_id);
}
if (s.ok()) {
RecoverEpochNumbers();
}
if (log_status) {
*log_status = log_read_status;
}
}
if (s.ok()) {
manifest_file_size_ = current_manifest_file_size;
ROCKS_LOG_INFO(
db_options_->info_log,
"Recovered from manifest file:%s succeeded,"
"manifest_file_number is %" PRIu64 ", next_file_number is %" PRIu64
", last_sequence is %" PRIu64 ", log_number is %" PRIu64
",prev_log_number is %" PRIu64 ",max_column_family is %" PRIu32
",min_log_number_to_keep is %" PRIu64 "\n",
manifest_path.c_str(), manifest_file_number_, next_file_number_.load(),
last_sequence_.load(), log_number, prev_log_number_,
column_family_set_->GetMaxColumnFamily(), min_log_number_to_keep());
for (auto cfd : *column_family_set_) {
if (cfd->IsDropped()) {
continue;
}
ROCKS_LOG_INFO(db_options_->info_log,
"Column family [%s] (ID %" PRIu32
"), log number is %" PRIu64 "\n",
cfd->GetName().c_str(), cfd->GetID(), cfd->GetLogNumber());
}
}
return s;
}
namespace {
class ManifestPicker {
public:
explicit ManifestPicker(const std::string& dbname,
const std::vector<std::string>& files_in_dbname);
std::string GetNextManifest(uint64_t* file_number, std::string* file_name);
bool Valid() const { return manifest_file_iter_ != manifest_files_.end(); }
private:
const std::string& dbname_;
std::vector<std::string> manifest_files_;
std::vector<std::string>::const_iterator manifest_file_iter_;
};
ManifestPicker::ManifestPicker(const std::string& dbname,
const std::vector<std::string>& files_in_dbname)
: dbname_(dbname) {
assert(!files_in_dbname.empty());
for (const auto& fname : files_in_dbname) {
uint64_t file_num = 0;
FileType file_type;
bool parse_ok = ParseFileName(fname, &file_num, &file_type);
if (parse_ok && file_type == kDescriptorFile) {
manifest_files_.push_back(fname);
}
}
std::sort(manifest_files_.begin(), manifest_files_.end(),
[](const std::string& lhs, const std::string& rhs) {
uint64_t num1 = 0;
uint64_t num2 = 0;
FileType type1;
FileType type2;
bool parse_ok1 = ParseFileName(lhs, &num1, &type1);
bool parse_ok2 = ParseFileName(rhs, &num2, &type2);
#ifndef NDEBUG
assert(parse_ok1);
assert(parse_ok2);
#else
(void)parse_ok1;
(void)parse_ok2;
#endif
return num1 > num2;
});
manifest_file_iter_ = manifest_files_.begin();
}
std::string ManifestPicker::GetNextManifest(uint64_t* number,
std::string* file_name) {
assert(Valid());
std::string ret;
if (manifest_file_iter_ != manifest_files_.end()) {
ret.assign(dbname_);
if (ret.back() != kFilePathSeparator) {
ret.push_back(kFilePathSeparator);
}
ret.append(*manifest_file_iter_);
if (number) {
FileType type;
bool parse = ParseFileName(*manifest_file_iter_, number, &type);
assert(type == kDescriptorFile);
#ifndef NDEBUG
assert(parse);
#else
(void)parse;
#endif
}
if (file_name) {
*file_name = *manifest_file_iter_;
}
++manifest_file_iter_;
}
return ret;
}
}
Status VersionSet::TryRecover(
const std::vector<ColumnFamilyDescriptor>& column_families, bool read_only,
const std::vector<std::string>& files_in_dbname, std::string* db_id,
bool* has_missing_table_file) {
ManifestPicker manifest_picker(dbname_, files_in_dbname);
if (!manifest_picker.Valid()) {
return Status::Corruption("Cannot locate MANIFEST file in " + dbname_);
}
Status s;
std::string manifest_path =
manifest_picker.GetNextManifest(&manifest_file_number_, nullptr);
while (!manifest_path.empty()) {
s = TryRecoverFromOneManifest(manifest_path, column_families, read_only,
db_id, has_missing_table_file);
if (s.ok() || !manifest_picker.Valid()) {
break;
}
Reset();
manifest_path =
manifest_picker.GetNextManifest(&manifest_file_number_, nullptr);
}
return s;
}
Status VersionSet::TryRecoverFromOneManifest(
const std::string& manifest_path,
const std::vector<ColumnFamilyDescriptor>& column_families, bool read_only,
std::string* db_id, bool* has_missing_table_file) {
const ReadOptions read_options(Env::IOActivity::kDBOpen);
ROCKS_LOG_INFO(db_options_->info_log, "Trying to recover from manifest: %s\n",
manifest_path.c_str());
std::unique_ptr<SequentialFileReader> manifest_file_reader;
Status s;
{
std::unique_ptr<FSSequentialFile> manifest_file;
s = fs_->NewSequentialFile(manifest_path,
fs_->OptimizeForManifestRead(file_options_),
&manifest_file, nullptr);
if (!s.ok()) {
return s;
}
manifest_file_reader.reset(new SequentialFileReader(
std::move(manifest_file), manifest_path,
db_options_->log_readahead_size, io_tracer_, db_options_->listeners));
}
assert(s.ok());
VersionSet::LogReporter reporter;
reporter.status = &s;
log::Reader reader(nullptr, std::move(manifest_file_reader), &reporter,
true, 0);
VersionEditHandlerPointInTime handler_pit(
read_only, column_families, const_cast<VersionSet*>(this), io_tracer_,
read_options, true,
EpochNumberRequirement::kMightMissing);
handler_pit.Iterate(reader, &s);
handler_pit.GetDbId(db_id);
assert(nullptr != has_missing_table_file);
*has_missing_table_file = handler_pit.HasMissingFiles();
s = handler_pit.status();
if (s.ok()) {
RecoverEpochNumbers();
}
return s;
}
void VersionSet::RecoverEpochNumbers() {
for (auto cfd : *column_family_set_) {
if (cfd->IsDropped()) {
continue;
}
assert(cfd->initialized());
cfd->RecoverEpochNumbers();
}
}
Status VersionSet::ListColumnFamilies(std::vector<std::string>* column_families,
const std::string& dbname,
FileSystem* fs) {
std::string manifest_path;
uint64_t manifest_file_number;
Status s = GetCurrentManifestPath(dbname, fs, false,
&manifest_path, &manifest_file_number);
if (!s.ok()) {
return s;
}
return ListColumnFamiliesFromManifest(manifest_path, fs, column_families);
}
Status VersionSet::ListColumnFamiliesFromManifest(
const std::string& manifest_path, FileSystem* fs,
std::vector<std::string>* column_families) {
const ReadOptions read_options;
std::unique_ptr<SequentialFileReader> file_reader;
Status s;
{
std::unique_ptr<FSSequentialFile> file;
s = fs->NewSequentialFile(manifest_path, FileOptions(), &file, nullptr);
if (!s.ok()) {
return s;
}
file_reader = std::make_unique<SequentialFileReader>(
std::move(file), manifest_path, nullptr);
}
VersionSet::LogReporter reporter;
reporter.status = &s;
log::Reader reader(nullptr, std::move(file_reader), &reporter,
true , 0 );
ListColumnFamiliesHandler handler(read_options);
handler.Iterate(reader, &s);
assert(column_families);
column_families->clear();
if (handler.status().ok()) {
for (const auto& iter : handler.GetColumnFamilyNames()) {
column_families->push_back(iter.second);
}
}
return handler.status();
}
Status VersionSet::ReduceNumberOfLevels(const std::string& dbname,
const Options* options,
const FileOptions& file_options,
int new_levels) {
if (new_levels <= 1) {
return Status::InvalidArgument(
"Number of levels needs to be bigger than 1");
}
const ReadOptions read_options;
const WriteOptions write_options;
ImmutableDBOptions imm_db_options(*options);
MutableDBOptions mutable_db_options(*options);
ColumnFamilyOptions cf_options(*options);
std::shared_ptr<Cache> tc(NewLRUCache(options->max_open_files - 10,
options->table_cache_numshardbits));
WriteController wc(options->delayed_write_rate);
WriteBufferManager wb(options->db_write_buffer_size);
VersionSet versions(dbname, &imm_db_options, mutable_db_options, file_options,
tc.get(), &wb, &wc, nullptr ,
nullptr ,
"",
"", options->daily_offpeak_time_utc,
nullptr, false);
Status status;
std::vector<ColumnFamilyDescriptor> dummy;
ColumnFamilyDescriptor dummy_descriptor(kDefaultColumnFamilyName,
ColumnFamilyOptions(*options));
dummy.push_back(dummy_descriptor);
status = versions.Recover(dummy);
if (!status.ok()) {
return status;
}
Version* current_version =
versions.GetColumnFamilySet()->GetDefault()->current();
auto* vstorage = current_version->storage_info();
int current_levels = vstorage->num_levels();
if (current_levels <= new_levels) {
return Status::OK();
}
int first_nonempty_level = -1;
int first_nonempty_level_filenum = 0;
for (int i = new_levels - 1; i < current_levels; i++) {
int file_num = vstorage->NumLevelFiles(i);
if (file_num != 0) {
if (first_nonempty_level < 0) {
first_nonempty_level = i;
first_nonempty_level_filenum = file_num;
} else {
char msg[255];
snprintf(msg, sizeof(msg),
"Found at least two levels containing files: "
"[%d:%d],[%d:%d].\n",
first_nonempty_level, first_nonempty_level_filenum, i,
file_num);
return Status::InvalidArgument(msg);
}
}
}
std::vector<FileMetaData*>* new_files_list =
new std::vector<FileMetaData*>[current_levels];
for (int i = 0; i < new_levels - 1; i++) {
new_files_list[i] = vstorage->LevelFiles(i);
}
if (first_nonempty_level > 0) {
auto& new_last_level = new_files_list[new_levels - 1];
new_last_level = vstorage->LevelFiles(first_nonempty_level);
for (size_t i = 0; i < new_last_level.size(); ++i) {
const FileMetaData* const meta = new_last_level[i];
assert(meta);
const uint64_t file_number = meta->fd.GetNumber();
vstorage->file_locations_[file_number] =
VersionStorageInfo::FileLocation(new_levels - 1, i);
}
}
delete[] vstorage->files_;
vstorage->files_ = new_files_list;
vstorage->num_levels_ = new_levels;
vstorage->ResizeCompactCursors(new_levels);
VersionEdit ve;
InstrumentedMutex dummy_mutex;
InstrumentedMutexLock l(&dummy_mutex);
return versions.LogAndApply(versions.GetColumnFamilySet()->GetDefault(),
read_options, write_options, &ve, &dummy_mutex,
nullptr, true);
}
Status VersionSet::GetLiveFilesChecksumInfo(FileChecksumList* checksum_list) {
Status s;
if (checksum_list == nullptr) {
s = Status::InvalidArgument("checksum_list is nullptr");
return s;
}
checksum_list->reset();
for (auto cfd : *column_family_set_) {
assert(cfd);
if (cfd->IsDropped() || !cfd->initialized()) {
continue;
}
const auto* current = cfd->current();
assert(current);
const auto* vstorage = current->storage_info();
assert(vstorage);
for (int level = 0; level < cfd->NumberLevels(); level++) {
const auto& level_files = vstorage->LevelFiles(level);
for (const auto& file : level_files) {
assert(file);
s = checksum_list->InsertOneFileChecksum(file->fd.GetNumber(),
file->file_checksum,
file->file_checksum_func_name);
if (!s.ok()) {
return s;
}
}
}
const auto& blob_files = vstorage->GetBlobFiles();
for (const auto& meta : blob_files) {
assert(meta);
std::string checksum_value = meta->GetChecksumValue();
std::string checksum_method = meta->GetChecksumMethod();
assert(checksum_value.empty() == checksum_method.empty());
if (meta->GetChecksumMethod().empty()) {
checksum_value = kUnknownFileChecksum;
checksum_method = kUnknownFileChecksumFuncName;
}
s = checksum_list->InsertOneFileChecksum(meta->GetBlobFileNumber(),
checksum_value, checksum_method);
if (!s.ok()) {
return s;
}
}
}
return s;
}
Status VersionSet::DumpManifest(
Options& options, std::string& dscname, bool verbose, bool hex, bool json,
const std::vector<ColumnFamilyDescriptor>& cf_descs) {
assert(options.env);
const ReadOptions read_options;
std::vector<std::string> column_families;
Status s = ListColumnFamiliesFromManifest(
dscname, options.env->GetFileSystem().get(), &column_families);
if (!s.ok()) {
return s;
}
std::unique_ptr<SequentialFileReader> file_reader;
{
std::unique_ptr<FSSequentialFile> file;
const std::shared_ptr<FileSystem>& fs = options.env->GetFileSystem();
s = fs->NewSequentialFile(
dscname, fs->OptimizeForManifestRead(file_options_), &file, nullptr);
if (!s.ok()) {
return s;
}
file_reader = std::make_unique<SequentialFileReader>(
std::move(file), dscname, db_options_->log_readahead_size, io_tracer_);
}
std::map<std::string, const ColumnFamilyDescriptor*> cf_name_to_desc;
for (const auto& cf_desc : cf_descs) {
cf_name_to_desc[cf_desc.name] = &cf_desc;
}
std::vector<ColumnFamilyDescriptor> final_cf_descs;
for (const auto& cf : column_families) {
const auto iter = cf_name_to_desc.find(cf);
if (iter != cf_name_to_desc.cend()) {
final_cf_descs.push_back(*iter->second);
} else {
final_cf_descs.emplace_back(cf, options);
}
}
DumpManifestHandler handler(final_cf_descs, this, io_tracer_, read_options,
verbose, hex, json);
{
VersionSet::LogReporter reporter;
reporter.status = &s;
log::Reader reader(nullptr, std::move(file_reader), &reporter,
true , 0 );
handler.Iterate(reader, &s);
}
return handler.status();
}
void VersionSet::MarkFileNumberUsed(uint64_t number) {
if (next_file_number_.load(std::memory_order_relaxed) <= number) {
next_file_number_.store(number + 1, std::memory_order_relaxed);
}
}
void VersionSet::MarkMinLogNumberToKeep(uint64_t number) {
if (min_log_number_to_keep_.load(std::memory_order_relaxed) < number) {
min_log_number_to_keep_.store(number, std::memory_order_relaxed);
}
}
Status VersionSet::WriteCurrentStateToManifest(
const WriteOptions& write_options,
const std::unordered_map<uint32_t, MutableCFState>& curr_state,
const VersionEdit& wal_additions, log::Writer* log, IOStatus& io_s) {
assert(io_s.ok());
if (db_options_->write_dbid_to_manifest) {
VersionEdit edit_for_db_id;
assert(!db_id_.empty());
edit_for_db_id.SetDBId(db_id_);
std::string db_id_record;
if (!edit_for_db_id.EncodeTo(&db_id_record)) {
return Status::Corruption("Unable to Encode VersionEdit:" +
edit_for_db_id.DebugString(true));
}
io_s = log->AddRecord(write_options, db_id_record);
if (!io_s.ok()) {
return io_s;
}
}
if (!wal_additions.GetWalAdditions().empty()) {
TEST_SYNC_POINT_CALLBACK("VersionSet::WriteCurrentStateToManifest:SaveWal",
const_cast<VersionEdit*>(&wal_additions));
std::string record;
if (!wal_additions.EncodeTo(&record)) {
return Status::Corruption("Unable to Encode VersionEdit: " +
wal_additions.DebugString(true));
}
io_s = log->AddRecord(write_options, record);
if (!io_s.ok()) {
return io_s;
}
}
VersionEdit wal_deletions;
wal_deletions.DeleteWalsBefore(min_log_number_to_keep());
std::string wal_deletions_record;
if (!wal_deletions.EncodeTo(&wal_deletions_record)) {
return Status::Corruption("Unable to Encode VersionEdit: " +
wal_deletions.DebugString(true));
}
io_s = log->AddRecord(write_options, wal_deletions_record);
if (!io_s.ok()) {
return io_s;
}
for (auto cfd : *column_family_set_) {
assert(cfd);
if (cfd->IsDropped()) {
continue;
}
assert(cfd->initialized());
{
VersionEdit edit;
if (cfd->GetID() != 0) {
edit.AddColumnFamily(cfd->GetName());
edit.SetColumnFamily(cfd->GetID());
}
edit.SetComparatorName(
cfd->internal_comparator().user_comparator()->Name());
edit.SetPersistUserDefinedTimestamps(
cfd->ioptions().persist_user_defined_timestamps);
std::string record;
if (!edit.EncodeTo(&record)) {
return Status::Corruption("Unable to Encode VersionEdit:" +
edit.DebugString(true));
}
io_s = log->AddRecord(write_options, record);
if (!io_s.ok()) {
return io_s;
}
}
{
VersionEdit edit;
edit.SetColumnFamily(cfd->GetID());
const auto* current = cfd->current();
assert(current);
const auto* vstorage = current->storage_info();
assert(vstorage);
for (int level = 0; level < cfd->NumberLevels(); level++) {
const auto& level_files = vstorage->LevelFiles(level);
for (const auto& f : level_files) {
assert(f);
edit.AddFile(level, f->fd.GetNumber(), f->fd.GetPathId(),
f->fd.GetFileSize(), f->smallest, f->largest,
f->fd.smallest_seqno, f->fd.largest_seqno,
f->marked_for_compaction, f->temperature,
f->oldest_blob_file_number, f->oldest_ancester_time,
f->file_creation_time, f->epoch_number, f->file_checksum,
f->file_checksum_func_name, f->unique_id,
f->compensated_range_deletion_size, f->tail_size,
f->user_defined_timestamps_persisted, f->min_timestamp,
f->max_timestamp);
}
}
edit.SetCompactCursors(vstorage->GetCompactCursors());
const auto& blob_files = vstorage->GetBlobFiles();
for (const auto& meta : blob_files) {
assert(meta);
const uint64_t blob_file_number = meta->GetBlobFileNumber();
edit.AddBlobFile(blob_file_number, meta->GetTotalBlobCount(),
meta->GetTotalBlobBytes(), meta->GetChecksumMethod(),
meta->GetChecksumValue());
if (meta->GetGarbageBlobCount() > 0) {
edit.AddBlobFileGarbage(blob_file_number, meta->GetGarbageBlobCount(),
meta->GetGarbageBlobBytes());
}
}
const auto iter = curr_state.find(cfd->GetID());
assert(iter != curr_state.end());
uint64_t log_number = iter->second.log_number;
edit.SetLogNumber(log_number);
if (cfd->GetID() == 0) {
uint64_t min_log = min_log_number_to_keep();
if (min_log != 0) {
edit.SetMinLogNumberToKeep(min_log);
}
}
const std::string& full_history_ts_low = iter->second.full_history_ts_low;
if (!full_history_ts_low.empty()) {
edit.SetFullHistoryTsLow(full_history_ts_low);
}
edit.SetLastSequence(descriptor_last_sequence_);
const Comparator* ucmp = cfd->user_comparator();
assert(ucmp);
std::string record;
if (!edit.EncodeTo(&record, ucmp->timestamp_size())) {
return Status::Corruption("Unable to Encode VersionEdit:" +
edit.DebugString(true));
}
io_s = log->AddRecord(write_options, record);
if (!io_s.ok()) {
return io_s;
}
}
}
return Status::OK();
}
uint64_t VersionSet::ApproximateSize(const SizeApproximationOptions& options,
const ReadOptions& read_options,
Version* v, const Slice& start,
const Slice& end, int start_level,
int end_level, TableReaderCaller caller) {
const auto& icmp = v->cfd_->internal_comparator();
assert(icmp.Compare(start, end) <= 0);
uint64_t total_full_size = 0;
const auto* vstorage = v->storage_info();
const int num_non_empty_levels = vstorage->num_non_empty_levels();
end_level = (end_level == -1) ? num_non_empty_levels
: std::min(end_level, num_non_empty_levels);
if (end_level <= start_level) {
return 0;
}
autovector<FdWithKeyRange*, 32> first_files;
autovector<FdWithKeyRange*, 16> last_files;
for (int level = start_level; level < end_level; ++level) {
const LevelFilesBrief& files_brief = vstorage->LevelFilesBrief(level);
if (files_brief.num_files == 0) {
continue;
}
if (level == 0) {
for (size_t i = 0; i < files_brief.num_files; i++) {
first_files.push_back(&files_brief.files[i]);
}
continue;
}
assert(level > 0);
assert(files_brief.num_files > 0);
const int idx_start =
FindFileInRange(icmp, files_brief, start, 0,
static_cast<uint32_t>(files_brief.num_files - 1));
assert(static_cast<size_t>(idx_start) < files_brief.num_files);
int idx_end = idx_start;
if (icmp.Compare(files_brief.files[idx_end].largest_key, end) < 0) {
idx_end =
FindFileInRange(icmp, files_brief, end, idx_start,
static_cast<uint32_t>(files_brief.num_files - 1));
}
assert(idx_end >= idx_start &&
static_cast<size_t>(idx_end) < files_brief.num_files);
for (int i = idx_start + 1; i < idx_end; ++i) {
uint64_t file_size = files_brief.files[i].fd.GetFileSize();
assert(file_size == ApproximateSize(read_options, v, files_brief.files[i],
start, end, caller));
total_full_size += file_size;
}
first_files.push_back(&files_brief.files[idx_start]);
if (idx_start != idx_end) {
last_files.push_back(&files_brief.files[idx_end]);
}
}
uint64_t total_intersecting_size = 0;
for (const auto* file_ptr : first_files) {
total_intersecting_size += file_ptr->fd.GetFileSize();
}
for (const auto* file_ptr : last_files) {
total_intersecting_size += file_ptr->fd.GetFileSize();
}
const double margin = options.files_size_error_margin;
if (margin > 0 && total_intersecting_size <
static_cast<uint64_t>(total_full_size * margin)) {
total_full_size += total_intersecting_size / 2;
} else {
for (const auto file_ptr : first_files) {
total_full_size +=
ApproximateSize(read_options, v, *file_ptr, start, end, caller);
}
for (const auto file_ptr : last_files) {
total_full_size +=
ApproximateOffsetOf(read_options, v, *file_ptr, end, caller);
}
}
return total_full_size;
}
uint64_t VersionSet::ApproximateOffsetOf(const ReadOptions& read_options,
Version* v, const FdWithKeyRange& f,
const Slice& key,
TableReaderCaller caller) {
assert(v);
const auto& icmp = v->cfd_->internal_comparator();
uint64_t result = 0;
if (icmp.Compare(f.largest_key, key) <= 0) {
result = f.fd.GetFileSize();
} else if (icmp.Compare(f.smallest_key, key) > 0) {
result = 0;
} else {
TableCache* table_cache = v->cfd_->table_cache();
const MutableCFOptions& cf_opts = v->GetMutableCFOptions();
if (table_cache != nullptr) {
result = table_cache->ApproximateOffsetOf(
read_options, key, *f.file_metadata, caller, icmp, cf_opts);
}
}
return result;
}
uint64_t VersionSet::ApproximateSize(const ReadOptions& read_options,
Version* v, const FdWithKeyRange& f,
const Slice& start, const Slice& end,
TableReaderCaller caller) {
assert(v);
const auto& icmp = v->cfd_->internal_comparator();
assert(icmp.Compare(start, end) <= 0);
if (icmp.Compare(f.largest_key, start) <= 0 ||
icmp.Compare(f.smallest_key, end) > 0) {
return 0;
}
if (icmp.Compare(f.smallest_key, start) >= 0) {
return ApproximateOffsetOf(read_options, v, f, end, caller);
}
if (icmp.Compare(f.largest_key, end) < 0) {
uint64_t start_offset =
ApproximateOffsetOf(read_options, v, f, start, caller);
assert(f.fd.GetFileSize() >= start_offset);
return f.fd.GetFileSize() - start_offset;
}
TableCache* table_cache = v->cfd_->table_cache();
if (table_cache == nullptr) {
return 0;
}
const MutableCFOptions& cf_opts = v->GetMutableCFOptions();
return table_cache->ApproximateSize(read_options, start, end,
*f.file_metadata, caller, icmp, cf_opts);
}
void VersionSet::RemoveLiveFiles(
std::vector<ObsoleteFileInfo>& sst_delete_candidates,
std::vector<ObsoleteBlobFileInfo>& blob_delete_candidates) const {
assert(column_family_set_);
for (auto cfd : *column_family_set_) {
assert(cfd);
if (!cfd->initialized()) {
continue;
}
auto* current = cfd->current();
bool found_current = false;
Version* const dummy_versions = cfd->dummy_versions();
assert(dummy_versions);
for (Version* v = dummy_versions->next_; v != dummy_versions;
v = v->next_) {
v->RemoveLiveFiles(sst_delete_candidates, blob_delete_candidates);
if (v == current) {
found_current = true;
}
}
if (!found_current && current != nullptr) {
assert(false);
current->RemoveLiveFiles(sst_delete_candidates, blob_delete_candidates);
}
}
}
void VersionSet::AddLiveFiles(std::vector<uint64_t>* live_table_files,
std::vector<uint64_t>* live_blob_files) const {
assert(live_table_files);
assert(live_blob_files);
size_t total_table_files = 0;
size_t total_blob_files = 0;
assert(column_family_set_);
for (auto cfd : *column_family_set_) {
assert(cfd);
if (!cfd->initialized()) {
continue;
}
Version* const dummy_versions = cfd->dummy_versions();
assert(dummy_versions);
for (Version* v = dummy_versions->next_; v != dummy_versions;
v = v->next_) {
assert(v);
const auto* vstorage = v->storage_info();
assert(vstorage);
for (int level = 0; level < vstorage->num_levels(); ++level) {
total_table_files += vstorage->LevelFiles(level).size();
}
total_blob_files += vstorage->GetBlobFiles().size();
}
}
live_table_files->reserve(live_table_files->size() + total_table_files);
live_blob_files->reserve(live_blob_files->size() + total_blob_files);
assert(column_family_set_);
for (auto cfd : *column_family_set_) {
assert(cfd);
if (!cfd->initialized()) {
continue;
}
auto* current = cfd->current();
bool found_current = false;
Version* const dummy_versions = cfd->dummy_versions();
assert(dummy_versions);
for (Version* v = dummy_versions->next_; v != dummy_versions;
v = v->next_) {
v->AddLiveFiles(live_table_files, live_blob_files);
if (v == current) {
found_current = true;
}
}
if (!found_current && current != nullptr) {
assert(false);
current->AddLiveFiles(live_table_files, live_blob_files);
}
}
}
InternalIterator* VersionSet::MakeInputIterator(
const ReadOptions& read_options, const Compaction* c,
RangeDelAggregator* range_del_agg,
const FileOptions& file_options_compactions,
const std::optional<const Slice>& start,
const std::optional<const Slice>& end) {
auto cfd = c->column_family_data();
const size_t space = (c->level() == 0 ? c->input_levels(0)->num_files +
c->num_input_levels() - 1
: c->num_input_levels());
InternalIterator** list = new InternalIterator*[space];
std::vector<std::pair<std::unique_ptr<TruncatedRangeDelIterator>,
std::unique_ptr<TruncatedRangeDelIterator>**>>
range_tombstones;
size_t num = 0;
[[maybe_unused]] size_t num_input_files = 0;
for (size_t which = 0; which < c->num_input_levels(); which++) {
const LevelFilesBrief* flevel = c->input_levels(which);
num_input_files += flevel->num_files;
if (flevel->num_files != 0) {
if (c->level(which) == 0) {
for (size_t i = 0; i < flevel->num_files; i++) {
const FileMetaData& fmd = *flevel->files[i].file_metadata;
if (start.has_value() &&
cfd->user_comparator()->CompareWithoutTimestamp(
*start, fmd.largest.user_key()) > 0) {
continue;
}
if (end.has_value() &&
cfd->user_comparator()->CompareWithoutTimestamp(
*end, fmd.smallest.user_key()) < 0) {
continue;
}
std::unique_ptr<TruncatedRangeDelIterator> range_tombstone_iter =
nullptr;
list[num++] = cfd->table_cache()->NewIterator(
read_options, file_options_compactions,
cfd->internal_comparator(), fmd, range_del_agg,
c->mutable_cf_options(),
nullptr,
nullptr, TableReaderCaller::kCompaction,
nullptr,
false,
static_cast<int>(c->level(which)),
MaxFileSizeForL0MetaPin(c->mutable_cf_options()),
nullptr,
nullptr,
false,
nullptr,
&range_tombstone_iter);
range_tombstones.emplace_back(std::move(range_tombstone_iter),
nullptr);
}
} else {
std::unique_ptr<TruncatedRangeDelIterator>** tombstone_iter_ptr =
nullptr;
list[num++] = new LevelIterator(
cfd->table_cache(), read_options, file_options_compactions,
cfd->internal_comparator(), flevel, c->mutable_cf_options(),
false,
nullptr,
TableReaderCaller::kCompaction, false,
static_cast<int>(c->level(which)), range_del_agg,
c->boundaries(which), false, &tombstone_iter_ptr,
db_options_->statistics.get(), clock_);
range_tombstones.emplace_back(nullptr, tombstone_iter_ptr);
}
}
}
TEST_SYNC_POINT_CALLBACK(
"VersionSet::MakeInputIterator:NewCompactionMergingIterator",
&num_input_files);
assert(num <= space);
InternalIterator* result = NewCompactionMergingIterator(
&c->column_family_data()->internal_comparator(), list,
static_cast<int>(num), range_tombstones, nullptr,
c->column_family_data()->internal_stats());
delete[] list;
return result;
}
Status VersionSet::GetMetadataForFile(uint64_t number, int* filelevel,
FileMetaData** meta,
ColumnFamilyData** cfd) {
for (auto cfd_iter : *column_family_set_) {
if (!cfd_iter->initialized()) {
continue;
}
Version* version = cfd_iter->current();
const auto* vstorage = version->storage_info();
for (int level = 0; level < vstorage->num_levels(); level++) {
for (const auto& file : vstorage->LevelFiles(level)) {
if (file->fd.GetNumber() == number) {
*meta = file;
*filelevel = level;
*cfd = cfd_iter;
return Status::OK();
}
}
}
}
return Status::NotFound("File not present in any level");
}
void VersionSet::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
if (!metadata) {
return;
}
assert(metadata);
size_t count = 0;
for (auto cfd : *column_family_set_) {
if (cfd->IsDropped() || !cfd->initialized()) {
continue;
}
for (int level = 0; level < cfd->NumberLevels(); level++) {
count += cfd->current()->storage_info()->LevelFiles(level).size();
}
}
metadata->reserve(count);
for (auto cfd : *column_family_set_) {
if (cfd->IsDropped() || !cfd->initialized()) {
continue;
}
for (int level = 0; level < cfd->NumberLevels(); level++) {
for (const auto& file :
cfd->current()->storage_info()->LevelFiles(level)) {
LiveFileMetaData filemetadata;
filemetadata.column_family_name = cfd->GetName();
uint32_t path_id = file->fd.GetPathId();
if (path_id < cfd->ioptions().cf_paths.size()) {
filemetadata.db_path = cfd->ioptions().cf_paths[path_id].path;
} else {
assert(!cfd->ioptions().cf_paths.empty());
filemetadata.db_path = cfd->ioptions().cf_paths.back().path;
}
filemetadata.directory = filemetadata.db_path;
const uint64_t file_number = file->fd.GetNumber();
filemetadata.name = MakeTableFileName("", file_number);
filemetadata.relative_filename = filemetadata.name.substr(1);
filemetadata.file_number = file_number;
filemetadata.level = level;
filemetadata.size = file->fd.GetFileSize();
filemetadata.smallestkey = file->smallest.user_key().ToString();
filemetadata.largestkey = file->largest.user_key().ToString();
filemetadata.smallest_seqno = file->fd.smallest_seqno;
filemetadata.largest_seqno = file->fd.largest_seqno;
filemetadata.num_reads_sampled =
file->stats.num_reads_sampled.load(std::memory_order_relaxed);
filemetadata.being_compacted = file->being_compacted;
filemetadata.num_entries = file->num_entries;
filemetadata.num_deletions = file->num_deletions;
filemetadata.oldest_blob_file_number = file->oldest_blob_file_number;
filemetadata.file_checksum = file->file_checksum;
filemetadata.file_checksum_func_name = file->file_checksum_func_name;
filemetadata.temperature = file->temperature;
filemetadata.oldest_ancester_time = file->TryGetOldestAncesterTime();
filemetadata.file_creation_time = file->TryGetFileCreationTime();
filemetadata.epoch_number = file->epoch_number;
metadata->push_back(filemetadata);
}
}
}
}
void VersionSet::GetObsoleteFiles(std::vector<ObsoleteFileInfo>* files,
std::vector<ObsoleteBlobFileInfo>* blob_files,
std::vector<std::string>* manifest_filenames,
uint64_t min_pending_output) {
assert(files);
assert(blob_files);
assert(manifest_filenames);
assert(files->empty());
assert(blob_files->empty());
assert(manifest_filenames->empty());
std::vector<ObsoleteFileInfo> pending_files;
for (auto& f : obsolete_files_) {
if (f.metadata->fd.GetNumber() < min_pending_output) {
files->emplace_back(std::move(f));
} else {
pending_files.emplace_back(std::move(f));
}
}
obsolete_files_.swap(pending_files);
std::vector<ObsoleteBlobFileInfo> pending_blob_files;
for (auto& blob_file : obsolete_blob_files_) {
if (blob_file.GetBlobFileNumber() < min_pending_output) {
blob_files->emplace_back(std::move(blob_file));
} else {
pending_blob_files.emplace_back(std::move(blob_file));
}
}
obsolete_blob_files_.swap(pending_blob_files);
obsolete_manifests_.swap(*manifest_filenames);
}
uint64_t VersionSet::GetObsoleteSstFilesSize() const {
uint64_t ret = 0;
for (auto& f : obsolete_files_) {
if (f.metadata != nullptr) {
ret += f.metadata->fd.GetFileSize();
}
}
return ret;
}
ColumnFamilyData* VersionSet::CreateColumnFamily(
const ColumnFamilyOptions& cf_options, const ReadOptions& read_options,
const VersionEdit* edit, bool read_only) {
assert(edit->IsColumnFamilyAdd());
assert(!unchanging_ || read_only);
MutableCFOptions dummy_cf_options;
Version* dummy_versions =
new Version(nullptr, this, file_options_, dummy_cf_options, io_tracer_);
dummy_versions->Ref();
auto new_cfd = column_family_set_->CreateColumnFamily(
edit->GetColumnFamilyName(), edit->GetColumnFamily(), dummy_versions,
cf_options, read_only);
Version* v = new Version(new_cfd, this, file_options_,
new_cfd->GetLatestMutableCFOptions(), io_tracer_,
current_version_number_++);
constexpr bool update_stats = false;
v->PrepareAppend(read_options, update_stats);
AppendVersion(new_cfd, v);
new_cfd->CreateNewMemtable(LastSequence());
new_cfd->SetLogNumber(edit->GetLogNumber());
return new_cfd;
}
uint64_t VersionSet::GetNumLiveVersions(Version* dummy_versions) {
uint64_t count = 0;
for (Version* v = dummy_versions->next_; v != dummy_versions; v = v->next_) {
count++;
}
return count;
}
uint64_t VersionSet::GetTotalSstFilesSize(Version* dummy_versions) {
std::unordered_set<uint64_t> unique_files;
uint64_t total_files_size = 0;
for (Version* v = dummy_versions->next_; v != dummy_versions; v = v->next_) {
VersionStorageInfo* storage_info = v->storage_info();
for (int level = 0; level < storage_info->num_levels_; level++) {
for (const auto& file_meta : storage_info->LevelFiles(level)) {
if (unique_files.find(file_meta->fd.packed_number_and_path_id) ==
unique_files.end()) {
unique_files.insert(file_meta->fd.packed_number_and_path_id);
total_files_size += file_meta->fd.GetFileSize();
}
}
}
}
return total_files_size;
}
uint64_t VersionSet::GetTotalBlobFileSize(Version* dummy_versions) {
std::unordered_set<uint64_t> unique_blob_files;
uint64_t all_versions_blob_file_size = 0;
for (auto* v = dummy_versions->next_; v != dummy_versions; v = v->next_) {
const auto* vstorage = v->storage_info();
assert(vstorage);
const auto& blob_files = vstorage->GetBlobFiles();
for (const auto& meta : blob_files) {
assert(meta);
const uint64_t blob_file_number = meta->GetBlobFileNumber();
if (unique_blob_files.find(blob_file_number) == unique_blob_files.end()) {
unique_blob_files.insert(blob_file_number);
all_versions_blob_file_size += meta->GetBlobFileSize();
}
}
}
return all_versions_blob_file_size;
}
Status VersionSet::VerifyFileMetadata(const ReadOptions& read_options,
ColumnFamilyData* cfd,
const std::string& fpath, int level,
const FileMetaData& meta) {
uint64_t fsize = 0;
Status status = fs_->GetFileSize(fpath, IOOptions(), &fsize, nullptr);
if (status.ok()) {
if (fsize != meta.fd.GetFileSize()) {
status = Status::Corruption("File size mismatch: " + fpath);
}
}
if (status.ok() && db_options_->verify_sst_unique_id_in_manifest) {
assert(cfd);
TableCache* table_cache = cfd->table_cache();
assert(table_cache);
const auto& cf_opts = cfd->GetLatestMutableCFOptions();
size_t max_sz_for_l0_meta_pin = MaxFileSizeForL0MetaPin(cf_opts);
const FileOptions& file_opts = file_options();
Version* version = cfd->current();
assert(version);
VersionStorageInfo& storage_info = version->storage_info_;
const InternalKeyComparator* icmp = storage_info.InternalComparator();
assert(icmp);
InternalStats* internal_stats = cfd->internal_stats();
TableCache::TypedHandle* handle = nullptr;
FileMetaData meta_copy = meta;
status = table_cache->FindTable(
read_options, file_opts, *icmp, meta_copy, &handle, cf_opts,
false, internal_stats->GetFileReadHist(level), false, level,
false, max_sz_for_l0_meta_pin,
meta_copy.temperature);
if (handle) {
table_cache->get_cache().Release(handle);
}
}
return status;
}
ReactiveVersionSet::ReactiveVersionSet(
const std::string& dbname, const ImmutableDBOptions* imm_db_options,
const MutableDBOptions& mutable_db_options,
const FileOptions& _file_options, Cache* table_cache,
WriteBufferManager* write_buffer_manager, WriteController* write_controller,
const std::shared_ptr<IOTracer>& io_tracer)
: VersionSet(dbname, imm_db_options, mutable_db_options, _file_options,
table_cache, write_buffer_manager, write_controller,
nullptr, io_tracer, "",
"", "",
nullptr, false) {}
ReactiveVersionSet::~ReactiveVersionSet() = default;
Status ReactiveVersionSet::Recover(
const std::vector<ColumnFamilyDescriptor>& column_families,
std::unique_ptr<log::FragmentBufferedReader>* manifest_reader,
std::unique_ptr<log::Reader::Reporter>* manifest_reporter,
std::unique_ptr<Status>* manifest_reader_status) {
assert(manifest_reader != nullptr);
assert(manifest_reporter != nullptr);
assert(manifest_reader_status != nullptr);
manifest_reader_status->reset(new Status());
manifest_reporter->reset(new LogReporter());
static_cast_with_check<LogReporter>(manifest_reporter->get())->status =
manifest_reader_status->get();
Status s = MaybeSwitchManifest(manifest_reporter->get(), manifest_reader);
if (!s.ok()) {
return s;
}
log::Reader* reader = manifest_reader->get();
assert(reader);
manifest_tailer_.reset(new ManifestTailer(
column_families, const_cast<ReactiveVersionSet*>(this), io_tracer_,
read_options_, EpochNumberRequirement::kMightMissing));
manifest_tailer_->Iterate(*reader, manifest_reader_status->get());
s = manifest_tailer_->status();
if (s.ok()) {
RecoverEpochNumbers();
}
return s;
}
Status ReactiveVersionSet::ReadAndApply(
InstrumentedMutex* mu,
std::unique_ptr<log::FragmentBufferedReader>* manifest_reader,
Status* manifest_read_status,
std::unordered_set<ColumnFamilyData*>* cfds_changed,
std::vector<std::string>* files_to_delete) {
assert(manifest_reader != nullptr);
assert(cfds_changed != nullptr);
mu->AssertHeld();
Status s;
log::Reader* reader = manifest_reader->get();
assert(reader);
s = MaybeSwitchManifest(reader->GetReporter(), manifest_reader);
if (!s.ok()) {
return s;
}
manifest_tailer_->Iterate(*(manifest_reader->get()), manifest_read_status);
s = manifest_tailer_->status();
if (s.ok()) {
*cfds_changed = std::move(manifest_tailer_->GetUpdatedColumnFamilies());
}
if (files_to_delete) {
*files_to_delete = manifest_tailer_->GetAndClearIntermediateFiles();
}
return s;
}
Status ReactiveVersionSet::MaybeSwitchManifest(
log::Reader::Reporter* reporter,
std::unique_ptr<log::FragmentBufferedReader>* manifest_reader) {
assert(manifest_reader != nullptr);
Status s;
std::string manifest_path;
s = GetCurrentManifestPath(dbname_, fs_.get(), false,
&manifest_path, &manifest_file_number_);
if (!s.ok()) {
return s;
}
std::unique_ptr<FSSequentialFile> manifest_file;
if (manifest_reader->get() != nullptr &&
manifest_reader->get()->file()->file_name() == manifest_path) {
return s;
}
assert(nullptr == manifest_reader->get() ||
manifest_reader->get()->file()->file_name() != manifest_path);
s = fs_->FileExists(manifest_path, IOOptions(), nullptr);
if (s.IsNotFound()) {
return Status::TryAgain(
"The primary may have switched to a new MANIFEST and deleted the old "
"one.");
} else if (!s.ok()) {
return s;
}
TEST_SYNC_POINT(
"ReactiveVersionSet::MaybeSwitchManifest:"
"AfterGetCurrentManifestPath:0");
TEST_SYNC_POINT(
"ReactiveVersionSet::MaybeSwitchManifest:"
"AfterGetCurrentManifestPath:1");
s = fs_->NewSequentialFile(manifest_path,
fs_->OptimizeForManifestRead(file_options_),
&manifest_file, nullptr);
std::unique_ptr<SequentialFileReader> manifest_file_reader;
if (s.ok()) {
manifest_file_reader.reset(new SequentialFileReader(
std::move(manifest_file), manifest_path,
db_options_->log_readahead_size, io_tracer_, db_options_->listeners));
manifest_reader->reset(new log::FragmentBufferedReader(
nullptr, std::move(manifest_file_reader), reporter, true ,
0 ));
ROCKS_LOG_INFO(db_options_->info_log, "Switched to new manifest: %s\n",
manifest_path.c_str());
if (manifest_tailer_) {
manifest_tailer_->PrepareToReadNewManifest();
}
} else if (s.IsPathNotFound()) {
s = Status::TryAgain(
"The primary may have switched to a new MANIFEST and deleted the old "
"one.");
}
return s;
}
#ifndef NDEBUG
uint64_t ReactiveVersionSet::TEST_read_edits_in_atomic_group() const {
assert(manifest_tailer_);
return manifest_tailer_->GetReadBuffer().TEST_read_edits_in_atomic_group();
}
#endif
std::vector<VersionEdit>& ReactiveVersionSet::replay_buffer() {
assert(manifest_tailer_);
return manifest_tailer_->GetReadBuffer().replay_buffer();
}
}