#pragma once
#include <cstdint>
#include <memory>
#include "cache/cache_entry_roles.h"
#include "cache/cache_key.h"
#include "cache/cache_reservation_manager.h"
#include "db/range_tombstone_fragmenter.h"
#include "db/seqno_to_time_mapping.h"
#include "file/filename.h"
#include "rocksdb/slice_transform.h"
#include "rocksdb/table_properties.h"
#include "table/block_based/block.h"
#include "table/block_based/block_based_table_factory.h"
#include "table/block_based/block_cache.h"
#include "table/block_based/block_type.h"
#include "table/block_based/cachable_entry.h"
#include "table/block_based/filter_block.h"
#include "table/block_based/uncompression_dict_reader.h"
#include "table/format.h"
#include "table/persistent_cache_options.h"
#include "table/table_properties_internal.h"
#include "table/table_reader.h"
#include "table/two_level_iterator.h"
#include "trace_replay/block_cache_tracer.h"
#include "util/atomic.h"
#include "util/cast_util.h"
#include "util/coro_utils.h"
#include "util/hash_containers.h"
namespace ROCKSDB_NAMESPACE {
class Cache;
class FilterBlockReader;
class FullFilterBlockReader;
class Footer;
class InternalKeyComparator;
class Iterator;
class FSRandomAccessFile;
class TableCache;
class TableReader;
class WritableFile;
struct BlockBasedTableOptions;
struct EnvOptions;
struct ReadOptions;
class GetContext;
using KVPairBlock = std::vector<std::pair<std::string, std::string>>;
class BlockBasedTable : public TableReader {
public:
static const std::string kObsoleteFilterBlockPrefix;
static const std::string kFullFilterBlockPrefix;
static const std::string kPartitionedFilterBlockPrefix;
static constexpr size_t kBlockTrailerSize = 5;
static Status Open(
const ReadOptions& ro, const ImmutableOptions& ioptions,
const EnvOptions& env_options,
const BlockBasedTableOptions& table_options,
const InternalKeyComparator& internal_key_comparator,
std::unique_ptr<RandomAccessFileReader>&& file, uint64_t file_size,
uint8_t block_protection_bytes_per_key,
std::unique_ptr<TableReader>* table_reader, uint64_t tail_size,
std::shared_ptr<CacheReservationManager> table_reader_cache_res_mgr =
nullptr,
const std::shared_ptr<const SliceTransform>& prefix_extractor = nullptr,
UnownedPtr<CompressionManager> compression_manager = nullptr,
bool prefetch_index_and_filter_in_cache = true, bool skip_filters = false,
int level = -1, const bool immortal_table = false,
const SequenceNumber largest_seqno = 0,
bool force_direct_prefetch = false,
TailPrefetchStats* tail_prefetch_stats = nullptr,
BlockCacheTracer* const block_cache_tracer = nullptr,
size_t max_file_size_for_l0_meta_pin = 0,
const std::string& cur_db_session_id = "", uint64_t cur_file_num = 0,
UniqueId64x2 expected_unique_id = {},
const bool user_defined_timestamps_persisted = true);
bool PrefixRangeMayMatch(const Slice& internal_key,
const ReadOptions& read_options,
const SliceTransform* options_prefix_extractor,
const bool need_upper_bound_check,
BlockCacheLookupContext* lookup_context,
bool* filter_checked) const;
InternalIterator* NewIterator(const ReadOptions&,
const SliceTransform* prefix_extractor,
Arena* arena, bool skip_filters,
TableReaderCaller caller,
size_t compaction_readahead_size = 0,
bool allow_unprepared_value = false) override;
FragmentedRangeTombstoneIterator* NewRangeTombstoneIterator(
const ReadOptions& read_options) override;
FragmentedRangeTombstoneIterator* NewRangeTombstoneIterator(
SequenceNumber read_seqno, const Slice* timestamp) override;
Status Get(const ReadOptions& readOptions, const Slice& key,
GetContext* get_context, const SliceTransform* prefix_extractor,
bool skip_filters = false) override;
Status MultiGetFilter(const ReadOptions& read_options,
const SliceTransform* prefix_extractor,
MultiGetRange* mget_range) override;
DECLARE_SYNC_AND_ASYNC_OVERRIDE(void, MultiGet,
const ReadOptions& readOptions,
const MultiGetContext::Range* mget_range,
const SliceTransform* prefix_extractor,
bool skip_filters = false);
Status Prefetch(const ReadOptions& read_options, const Slice* begin,
const Slice* end) override;
uint64_t ApproximateOffsetOf(const ReadOptions& read_options,
const Slice& key,
TableReaderCaller caller) override;
uint64_t ApproximateSize(const ReadOptions& read_options, const Slice& start,
const Slice& end, TableReaderCaller caller) override;
Status ApproximateKeyAnchors(const ReadOptions& read_options,
std::vector<Anchor>& anchors) override;
bool EraseFromCache(const BlockHandle& handle) const;
bool TEST_BlockInCache(const BlockHandle& handle) const;
bool TEST_KeyInCache(const ReadOptions& options, const Slice& key);
void TEST_GetDataBlockHandle(const ReadOptions& options, const Slice& key,
BlockHandle& handle);
void SetupForCompaction() override;
std::shared_ptr<const TableProperties> GetTableProperties() const override;
const SeqnoToTimeMapping& GetSeqnoToTimeMapping() const;
size_t ApproximateMemoryUsage() const override;
Status DumpTable(WritableFile* out_file,
bool show_sequence_number_type = false) override;
Status VerifyChecksum(const ReadOptions& readOptions,
TableReaderCaller caller,
bool meta_blocks_only = false) override;
void MarkObsolete(uint32_t uncache_aggressiveness) override;
~BlockBasedTable();
bool TEST_FilterBlockInCache() const;
bool TEST_IndexBlockInCache() const;
class IndexReader {
public:
virtual ~IndexReader() = default;
virtual InternalIteratorBase<IndexValue>* NewIterator(
const ReadOptions& read_options, bool disable_prefix_seek,
IndexBlockIter* iter, GetContext* get_context,
BlockCacheLookupContext* lookup_context) = 0;
virtual size_t ApproximateMemoryUsage() const = 0;
virtual Status CacheDependencies(
const ReadOptions& , bool ,
FilePrefetchBuffer* ) {
return Status::OK();
}
virtual void EraseFromCacheBeforeDestruction(
uint32_t ) {}
};
class IndexReaderCommon;
static void SetupBaseCacheKey(const TableProperties* properties,
const std::string& cur_db_session_id,
uint64_t cur_file_number,
OffsetableCacheKey* out_base_cache_key,
bool* out_is_stable = nullptr);
static CacheKey GetCacheKey(const OffsetableCacheKey& base_cache_key,
const BlockHandle& handle);
static void UpdateCacheInsertionMetrics(BlockType block_type,
GetContext* get_context, size_t usage,
bool redundant,
Statistics* const statistics);
Statistics* GetStatistics() const;
bool IsLastLevel() const;
static inline size_t BlockSizeWithTrailer(const BlockHandle& handle) {
return static_cast<size_t>(handle.size() + kBlockTrailerSize);
}
static inline CompressionType GetBlockCompressionType(const char* block_data,
size_t block_size) {
return static_cast<CompressionType>(block_data[block_size]);
}
static inline CompressionType GetBlockCompressionType(
const BlockContents& contents) {
assert(contents.has_trailer);
return GetBlockCompressionType(contents.data.data(), contents.data.size());
}
Status GetKVPairsFromDataBlocks(const ReadOptions& read_options,
std::vector<KVPairBlock>* kv_pair_blocks);
template <typename TBlocklike>
Status LookupAndPinBlocksInCache(
const ReadOptions& ro, const BlockHandle& handle,
CachableEntry<TBlocklike>* out_parsed_block) const;
template <typename TBlocklike>
Status CreateAndPinBlockInCache(
const ReadOptions& ro, const BlockHandle& handle,
UnownedPtr<Decompressor> decomp, BlockContents* block_contents,
CachableEntry<TBlocklike>* out_parsed_block) const;
struct Rep;
Rep* get_rep() { return rep_; }
const Rep* get_rep() const { return rep_; }
template <typename TBlockIter>
TBlockIter* NewDataBlockIterator(
const ReadOptions& ro, const BlockHandle& block_handle,
TBlockIter* input_iter, BlockType block_type, GetContext* get_context,
BlockCacheLookupContext* lookup_context,
FilePrefetchBuffer* prefetch_buffer, bool for_compaction, bool async_read,
Status& s, bool use_block_cache_for_lookup) const;
template <typename TBlockIter>
TBlockIter* NewDataBlockIterator(const ReadOptions& ro,
CachableEntry<Block>& block,
TBlockIter* input_iter, Status s) const;
class PartitionedIndexIteratorState;
template <typename TBlocklike>
friend class FilterBlockReaderCommon;
friend class PartitionIndexReader;
friend class UncompressionDictReader;
protected:
Rep* rep_;
explicit BlockBasedTable(Rep* rep, BlockCacheTracer* const block_cache_tracer)
: rep_(rep), block_cache_tracer_(block_cache_tracer) {}
explicit BlockBasedTable(const TableReader&) = delete;
void operator=(const TableReader&) = delete;
private:
friend class MockedBlockBasedTable;
friend class BlockBasedTableReaderTestVerifyChecksum_ChecksumMismatch_Test;
BlockCacheTracer* const block_cache_tracer_;
void UpdateCacheHitMetrics(BlockType block_type, GetContext* get_context,
size_t usage) const;
void UpdateCacheMissMetrics(BlockType block_type,
GetContext* get_context) const;
template <typename TBlockIter>
static TBlockIter* InitBlockIterator(const Rep* rep, Block* block,
BlockType block_type,
TBlockIter* input_iter,
bool block_contents_pinned);
template <typename TBlocklike>
WithBlocklikeCheck<Status, TBlocklike> MaybeReadBlockAndLoadToCache(
FilePrefetchBuffer* prefetch_buffer, const ReadOptions& ro,
const BlockHandle& handle, UnownedPtr<Decompressor> decomp,
bool for_compaction, CachableEntry<TBlocklike>* block_entry,
GetContext* get_context, BlockCacheLookupContext* lookup_context,
BlockContents* contents, bool async_read,
bool use_block_cache_for_lookup) const;
template <typename TBlocklike>
WithBlocklikeCheck<Status, TBlocklike> RetrieveBlock(
FilePrefetchBuffer* prefetch_buffer, const ReadOptions& ro,
const BlockHandle& handle, UnownedPtr<Decompressor> decomp,
CachableEntry<TBlocklike>* block_entry, GetContext* get_context,
BlockCacheLookupContext* lookup_context, bool for_compaction,
bool use_cache, bool async_read, bool use_block_cache_for_lookup) const;
template <typename TBlocklike>
WithBlocklikeCheck<void, TBlocklike> SaveLookupContextOrTraceRecord(
const Slice& block_key, bool is_cache_hit, const ReadOptions& ro,
const TBlocklike* parsed_block_value,
BlockCacheLookupContext* lookup_context) const;
void FinishTraceRecord(const BlockCacheLookupContext& lookup_context,
const Slice& block_key, const Slice& referenced_key,
bool does_referenced_key_exist,
uint64_t referenced_data_size) const;
DECLARE_SYNC_AND_ASYNC_CONST(
void, RetrieveMultipleBlocks, const ReadOptions& options,
const MultiGetRange* batch,
const autovector<BlockHandle, MultiGetContext::MAX_BATCH_SIZE>* handles,
Status* statuses, CachableEntry<Block_kData>* results, char* scratch,
UnownedPtr<Decompressor> decomp, bool use_fs_scratch);
InternalIteratorBase<IndexValue>* NewIndexIterator(
const ReadOptions& read_options, bool disable_prefix_seek,
IndexBlockIter* input_iter, GetContext* get_context,
BlockCacheLookupContext* lookup_context) const;
template <typename TBlocklike>
Cache::Priority GetCachePriority() const;
template <typename TBlocklike>
WithBlocklikeCheck<Status, TBlocklike> GetDataBlockFromCache(
const Slice& cache_key, BlockCacheInterface<TBlocklike> block_cache,
CachableEntry<TBlocklike>* block, GetContext* get_context,
UnownedPtr<Decompressor> decomp) const;
template <typename TBlocklike>
WithBlocklikeCheck<Status, TBlocklike> PutDataBlockToCache(
const Slice& cache_key, BlockCacheInterface<TBlocklike> block_cache,
CachableEntry<TBlocklike>* cached_block,
BlockContents&& uncompressed_block_contents,
BlockContents&& compressed_block_contents,
CompressionType block_comp_type, UnownedPtr<Decompressor> decomp,
MemoryAllocator* memory_allocator, GetContext* get_context) const;
friend class TableCache;
friend class BlockBasedTableBuilder;
Status CreateIndexReader(const ReadOptions& ro,
FilePrefetchBuffer* prefetch_buffer,
InternalIterator* preloaded_meta_index_iter,
bool use_cache, bool prefetch, bool pin,
BlockCacheLookupContext* lookup_context,
std::unique_ptr<IndexReader>* index_reader);
bool FullFilterKeyMayMatch(FilterBlockReader* filter, const Slice& user_key,
const SliceTransform* prefix_extractor,
GetContext* get_context,
BlockCacheLookupContext* lookup_context,
const ReadOptions& read_options) const;
void FullFilterKeysMayMatch(FilterBlockReader* filter, MultiGetRange* range,
const SliceTransform* prefix_extractor,
BlockCacheLookupContext* lookup_context,
const ReadOptions& read_options) const;
static Status PrefetchTail(
const ReadOptions& ro, const ImmutableOptions& ioptions,
RandomAccessFileReader* file, uint64_t file_size,
bool force_direct_prefetch, TailPrefetchStats* tail_prefetch_stats,
const bool prefetch_all, const bool preload_all,
std::unique_ptr<FilePrefetchBuffer>* prefetch_buffer, Statistics* stats,
uint64_t tail_size, Logger* const logger);
Status ReadMetaIndexBlock(const ReadOptions& ro,
FilePrefetchBuffer* prefetch_buffer,
std::unique_ptr<Block>* metaindex_block,
std::unique_ptr<InternalIterator>* iter);
Status ReadPropertiesBlock(const ReadOptions& ro,
FilePrefetchBuffer* prefetch_buffer,
InternalIterator* meta_iter,
const SequenceNumber largest_seqno);
Status ReadRangeDelBlock(const ReadOptions& ro,
FilePrefetchBuffer* prefetch_buffer,
InternalIterator* meta_iter,
const InternalKeyComparator& internal_comparator,
BlockCacheLookupContext* lookup_context);
Status PrefetchIndexAndFilterBlocks(
const ReadOptions& ro, FilePrefetchBuffer* prefetch_buffer,
InternalIterator* meta_iter, BlockBasedTable* new_table,
bool prefetch_all, const BlockBasedTableOptions& table_options,
const int level, size_t file_size, size_t max_file_size_for_l0_meta_pin,
BlockCacheLookupContext* lookup_context);
static BlockType GetBlockTypeForMetaBlockByName(const Slice& meta_block_name);
Status VerifyChecksumInMetaBlocks(const ReadOptions& read_options,
InternalIteratorBase<Slice>* index_iter);
Status VerifyChecksumInBlocks(const ReadOptions& read_options,
InternalIteratorBase<IndexValue>* index_iter);
std::unique_ptr<FilterBlockReader> CreateFilterBlockReader(
const ReadOptions& ro, FilePrefetchBuffer* prefetch_buffer,
bool use_cache, bool prefetch, bool pin,
BlockCacheLookupContext* lookup_context);
uint64_t GetApproximateDataSize();
uint64_t ApproximateDataOffsetOf(
const InternalIteratorBase<IndexValue>& index_iter,
uint64_t data_size) const;
Status DumpIndexBlock(std::ostream& out_stream);
Status DumpDataBlocks(std::ostream& out_stream,
bool show_sequence_number_type = false);
void DumpKeyValue(const Slice& key, const Slice& value,
std::ostream& out_stream,
bool show_sequence_number_type = false);
void DumpBlockChecksumInfo(const BlockHandle& block_handle,
const ReadOptions& read_options,
const char* block_name,
std::ostream& out_stream) const;
bool PrefixExtractorChanged(const SliceTransform* prefix_extractor) const;
bool TimestampMayMatch(const ReadOptions& read_options) const;
bool BlockTypeMaybeCompressed(BlockType type) const {
return type != BlockType::kFilter &&
type != BlockType::kCompressionDictionary &&
type != BlockType::kUserDefinedIndex;
}
static constexpr size_t kMultiGetReadStackBufSize = 8192;
friend class PartitionedFilterBlockReader;
friend class PartitionedFilterBlockTest;
friend class DBBasicTest_MultiGetIOBufferOverrun_Test;
friend class ReadSet;
friend class IODispatcherTest;
};
class BlockBasedTable::PartitionedIndexIteratorState
: public TwoLevelIteratorState {
public:
PartitionedIndexIteratorState(
const BlockBasedTable* table,
UnorderedMap<uint64_t, CachableEntry<Block>>* block_map);
InternalIteratorBase<IndexValue>* NewSecondaryIterator(
const BlockHandle& index_value) override;
private:
const BlockBasedTable* table_;
UnorderedMap<uint64_t, CachableEntry<Block>>* block_map_;
};
struct BlockBasedTable::Rep {
Rep(const ImmutableOptions& _ioptions, const EnvOptions& _env_options,
const BlockBasedTableOptions& _table_opt,
const InternalKeyComparator& _internal_comparator, bool skip_filters,
uint64_t _file_size, int _level, const bool _immortal_table,
const bool _user_defined_timestamps_persisted = true)
: ioptions(_ioptions),
env_options(_env_options),
table_options(_table_opt),
filter_policy(skip_filters ? nullptr : _table_opt.filter_policy.get()),
internal_comparator(_internal_comparator),
filter_type(FilterType::kNoFilter),
index_type(BlockBasedTableOptions::IndexType::kBinarySearch),
whole_key_filtering(_table_opt.whole_key_filtering),
prefix_filtering(true),
global_seqno(kDisableGlobalSequenceNumber),
file_size(_file_size),
level(_level),
immortal_table(_immortal_table),
user_defined_timestamps_persisted(_user_defined_timestamps_persisted),
fs_prefetch_support(CheckFSFeatureSupport(
_ioptions.fs.get(), FSSupportedOps::kFSPrefetch)) {}
~Rep() { status.PermitUncheckedError(); }
const ImmutableOptions& ioptions;
const EnvOptions& env_options;
const BlockBasedTableOptions table_options;
const FilterPolicy* const filter_policy;
const InternalKeyComparator& internal_comparator;
Status status;
std::unique_ptr<RandomAccessFileReader> file;
OffsetableCacheKey base_cache_key;
PersistentCacheOptions persistent_cache_options;
Footer footer;
std::unique_ptr<IndexReader> index_reader;
std::unique_ptr<FilterBlockReader> filter;
std::unique_ptr<UncompressionDictReader> uncompression_dict_reader;
enum class FilterType {
kNoFilter,
kFullFilter,
kPartitionedFilter,
};
FilterType filter_type;
BlockHandle filter_handle;
BlockHandle compression_dict_handle;
std::shared_ptr<const TableProperties> table_properties;
SeqnoToTimeMapping seqno_to_time_mapping;
BlockHandle index_handle;
BlockBasedTableOptions::IndexType index_type;
bool whole_key_filtering;
bool prefix_filtering;
std::shared_ptr<const SliceTransform> table_prefix_extractor;
std::shared_ptr<FragmentedRangeTombstoneList> fragmented_range_dels;
BlockCreateContext create_context;
SequenceNumber global_seqno;
uint64_t file_size;
int level;
Slice min_timestamp;
Slice max_timestamp;
std::shared_ptr<Decompressor> decompressor;
bool index_has_first_key = false;
bool index_key_includes_seq = true;
bool index_value_is_full = true;
uint32_t data_block_restart_interval = 0;
uint32_t index_block_restart_interval = 0;
bool separate_key_value_in_data_block = false;
bool verify_checksum_set_on_open = false;
const bool immortal_table;
const bool user_defined_timestamps_persisted;
const bool fs_prefetch_support;
RelaxedAtomic<uint32_t> uncache_aggressiveness{0};
std::unique_ptr<CacheReservationManager::CacheReservationHandle>
table_reader_cache_res_handle = nullptr;
CachableEntry<Block_kUserDefinedIndex> udi_block;
SequenceNumber get_global_seqno(BlockType block_type) const {
return (block_type == BlockType::kFilterPartitionIndex ||
block_type == BlockType::kCompressionDictionary)
? kDisableGlobalSequenceNumber
: global_seqno;
}
uint64_t cf_id_for_tracing() const {
return table_properties
? table_properties->column_family_id
: ROCKSDB_NAMESPACE::TablePropertiesCollectorFactory::Context::
kUnknownColumnFamily;
}
Slice cf_name_for_tracing() const {
return table_properties ? table_properties->column_family_name
: BlockCacheTraceHelper::kUnknownColumnFamilyName;
}
uint32_t level_for_tracing() const { return level >= 0 ? level : UINT32_MAX; }
uint64_t sst_number_for_tracing() const {
return file ? TableFileNameToNumber(file->file_name()) : UINT64_MAX;
}
void CreateFilePrefetchBuffer(
const ReadaheadParams& readahead_params,
std::unique_ptr<FilePrefetchBuffer>* fpb,
const std::function<void(bool, uint64_t&, uint64_t&)>& readaheadsize_cb,
FilePrefetchBufferUsage usage) const {
fpb->reset(new FilePrefetchBuffer(
readahead_params, !ioptions.allow_mmap_reads ,
false , ioptions.fs.get(), ioptions.clock,
ioptions.stats, readaheadsize_cb, usage));
}
void CreateFilePrefetchBufferIfNotExists(
const ReadaheadParams& readahead_params,
std::unique_ptr<FilePrefetchBuffer>* fpb,
const std::function<void(bool, uint64_t&, uint64_t&)>& readaheadsize_cb,
FilePrefetchBufferUsage usage = FilePrefetchBufferUsage::kUnknown) const {
if (!(*fpb)) {
CreateFilePrefetchBuffer(readahead_params, fpb, readaheadsize_cb, usage);
}
}
std::size_t ApproximateMemoryUsage() const {
std::size_t usage = 0;
#ifdef ROCKSDB_MALLOC_USABLE_SIZE
usage += malloc_usable_size(const_cast<BlockBasedTable::Rep*>(this));
#else
usage += sizeof(*this);
#endif return usage;
}
};
class WritableFileStringStreamAdapter : public std::stringbuf {
public:
explicit WritableFileStringStreamAdapter(WritableFile* writable_file)
: file_(writable_file) {}
int overflow(int ch = EOF) override {
if (ch != EOF) {
Status s = file_->Append(Slice((char*)&ch, 1));
if (s.ok()) {
return ch;
}
}
return EOF;
}
std::streamsize xsputn(char const* p, std::streamsize n) override {
Status s = file_->Append(Slice(p, n));
if (!s.ok()) {
return 0;
}
return n;
}
private:
WritableFile* file_;
};
}