#include "table/block_based/partitioned_index_reader.h"
#include "block_cache.h"
#include "file/random_access_file_reader.h"
#include "table/block_based/block_based_table_reader.h"
#include "table/block_based/partitioned_index_iterator.h"
namespace ROCKSDB_NAMESPACE {
Status PartitionIndexReader::Create(
const BlockBasedTable* table, const ReadOptions& ro,
FilePrefetchBuffer* prefetch_buffer, bool use_cache, bool prefetch,
bool pin, BlockCacheLookupContext* lookup_context,
std::unique_ptr<IndexReader>* index_reader) {
assert(table != nullptr);
assert(table->get_rep());
assert(!pin || prefetch);
assert(index_reader != nullptr);
CachableEntry<Block> index_block;
if (prefetch || !use_cache) {
const Status s =
ReadIndexBlock(table, prefetch_buffer, ro, use_cache,
nullptr, lookup_context, &index_block);
if (!s.ok()) {
return s;
}
if (use_cache && !pin) {
index_block.Reset();
}
}
index_reader->reset(new PartitionIndexReader(table, std::move(index_block)));
return Status::OK();
}
InternalIteratorBase<IndexValue>* PartitionIndexReader::NewIterator(
const ReadOptions& read_options, bool ,
IndexBlockIter* iter, GetContext* get_context,
BlockCacheLookupContext* lookup_context) {
CachableEntry<Block> index_block;
const Status s = GetOrReadIndexBlock(get_context, lookup_context,
&index_block, read_options);
if (!s.ok()) {
if (iter != nullptr) {
iter->Invalidate(s);
return iter;
}
return NewErrorInternalIterator<IndexValue>(s);
}
const BlockBasedTable::Rep* rep = table()->rep_;
InternalIteratorBase<IndexValue>* it = nullptr;
Statistics* kNullStats = nullptr;
if (!partition_map_.empty()) {
it = NewTwoLevelIterator(
new BlockBasedTable::PartitionedIndexIteratorState(table(),
&partition_map_),
index_block.GetValue()->NewIndexIterator(
internal_comparator()->user_comparator(),
rep->get_global_seqno(BlockType::kIndex), nullptr, kNullStats, true,
index_has_first_key(), index_key_includes_seq(),
index_value_is_full(), false ,
user_defined_timestamps_persisted()));
} else {
ReadOptions ro{read_options};
ro.readahead_size = ReadOptions{}.readahead_size;
std::unique_ptr<InternalIteratorBase<IndexValue>> index_iter(
index_block.GetValue()->NewIndexIterator(
internal_comparator()->user_comparator(),
rep->get_global_seqno(BlockType::kIndex), nullptr, kNullStats, true,
index_has_first_key(), index_key_includes_seq(),
index_value_is_full(), false ,
user_defined_timestamps_persisted()));
it = new PartitionedIndexIterator(
table(), ro, *internal_comparator(), std::move(index_iter),
lookup_context ? lookup_context->caller
: TableReaderCaller::kUncategorized);
}
assert(it != nullptr);
index_block.TransferTo(it);
return it;
}
Status PartitionIndexReader::CacheDependencies(
const ReadOptions& ro, bool pin, FilePrefetchBuffer* tail_prefetch_buffer) {
if (!partition_map_.empty()) {
return Status::OK();
}
BlockCacheLookupContext lookup_context{TableReaderCaller::kPrefetch};
const BlockBasedTable::Rep* rep = table()->rep_;
IndexBlockIter biter;
BlockHandle handle;
Statistics* kNullStats = nullptr;
CachableEntry<Block> index_block;
{
Status s = GetOrReadIndexBlock(nullptr , &lookup_context,
&index_block, ro);
if (!s.ok()) {
return s;
}
}
index_block.GetValue()->NewIndexIterator(
internal_comparator()->user_comparator(),
rep->get_global_seqno(BlockType::kIndex), &biter, kNullStats, true,
index_has_first_key(), index_key_includes_seq(), index_value_is_full(),
false , user_defined_timestamps_persisted());
biter.SeekToFirst();
if (!biter.Valid()) {
return biter.status();
}
handle = biter.value().handle;
uint64_t prefetch_off = handle.offset();
biter.SeekToLast();
if (!biter.Valid()) {
return biter.status();
}
handle = biter.value().handle;
uint64_t last_off =
handle.offset() + BlockBasedTable::BlockSizeWithTrailer(handle);
uint64_t prefetch_len = last_off - prefetch_off;
std::unique_ptr<FilePrefetchBuffer> prefetch_buffer;
if (tail_prefetch_buffer == nullptr || !tail_prefetch_buffer->Enabled() ||
tail_prefetch_buffer->GetPrefetchOffset() > prefetch_off) {
rep->CreateFilePrefetchBuffer(ReadaheadParams(), &prefetch_buffer,
nullptr,
FilePrefetchBufferUsage::kUnknown);
IOOptions opts;
{
Status s = rep->file->PrepareIOOptions(ro, opts);
if (s.ok()) {
s = prefetch_buffer->Prefetch(opts, rep->file.get(), prefetch_off,
static_cast<size_t>(prefetch_len));
}
if (!s.ok()) {
return s;
}
}
}
UnorderedMap<uint64_t, CachableEntry<Block>> map_in_progress;
biter.SeekToFirst();
size_t partition_count = 0;
for (; biter.Valid(); biter.Next()) {
handle = biter.value().handle;
CachableEntry<Block> block;
++partition_count;
Status s = table()->MaybeReadBlockAndLoadToCache(
prefetch_buffer ? prefetch_buffer.get() : tail_prefetch_buffer, ro,
handle, rep->decompressor.get(),
false, &block.As<Block_kIndex>(),
nullptr, &lookup_context, nullptr,
false, true);
if (!s.ok()) {
return s;
}
if (block.GetValue() != nullptr) {
if (block.IsCached() || block.GetOwnValue()) {
if (pin) {
map_in_progress[handle.offset()] = std::move(block);
}
}
}
}
Status s = biter.status();
if (map_in_progress.size() == partition_count && s.ok()) {
std::swap(partition_map_, map_in_progress);
}
return s;
}
void PartitionIndexReader::EraseFromCacheBeforeDestruction(
uint32_t uncache_aggressiveness) {
if (uncache_aggressiveness > 0) {
CachableEntry<Block> top_level_block;
ReadOptions ro_no_io;
ro_no_io.read_tier = ReadTier::kBlockCacheTier;
GetOrReadIndexBlock(nullptr,
nullptr, &top_level_block, ro_no_io)
.PermitUncheckedError();
if (!partition_map_.empty()) {
for (auto& e : partition_map_) {
e.second.ResetEraseIfLastRef();
}
} else if (!top_level_block.IsEmpty()) {
IndexBlockIter biter;
const InternalKeyComparator* const comparator = internal_comparator();
Statistics* kNullStats = nullptr;
top_level_block.GetValue()->NewIndexIterator(
comparator->user_comparator(),
table()->get_rep()->get_global_seqno(BlockType::kIndex), &biter,
kNullStats, true , index_has_first_key(),
index_key_includes_seq(), index_value_is_full(),
false ,
user_defined_timestamps_persisted());
UncacheAggressivenessAdvisor advisor(uncache_aggressiveness);
for (biter.SeekToFirst(); biter.Valid() && advisor.ShouldContinue();
biter.Next()) {
bool erased = table()->EraseFromCache(biter.value().handle);
advisor.Report(erased);
}
biter.status().PermitUncheckedError();
}
top_level_block.ResetEraseIfLastRef();
}
BlockBasedTable::IndexReaderCommon::EraseFromCacheBeforeDestruction(
uncache_aggressiveness);
}
}