#pragma once
#include "file/file_util.h"
#include "memory/memory_allocator_impl.h"
#include "table/block_based/block.h"
#include "table/block_based/block_type.h"
#include "table/format.h"
#include "table/persistent_cache_options.h"
#include "util/cast_util.h"
namespace ROCKSDB_NAMESPACE {
class BlockFetcher {
public:
BlockFetcher(RandomAccessFileReader* file,
FilePrefetchBuffer* prefetch_buffer,
const Footer& footer ,
const ReadOptions& read_options,
const BlockHandle& handle ,
BlockContents* contents,
const ImmutableOptions& ioptions ,
bool do_uncompress, bool maybe_compressed, BlockType block_type,
UnownedPtr<Decompressor> decompressor,
const PersistentCacheOptions& cache_options ,
MemoryAllocator* memory_allocator = nullptr,
MemoryAllocator* memory_allocator_compressed = nullptr,
bool for_compaction = false)
: file_(file),
prefetch_buffer_(prefetch_buffer),
footer_(footer),
read_options_(read_options),
handle_(handle),
contents_(contents),
ioptions_(ioptions),
do_uncompress_(do_uncompress),
maybe_compressed_(maybe_compressed),
block_type_(block_type),
block_size_(static_cast<size_t>(handle_.size())),
block_size_with_trailer_(block_size_ + footer.GetBlockTrailerSize()),
decompressor_(decompressor),
cache_options_(cache_options),
memory_allocator_(memory_allocator),
memory_allocator_compressed_(memory_allocator_compressed),
for_compaction_(for_compaction) {
io_status_.PermitUncheckedError(); if (CheckFSFeatureSupport(ioptions_.fs.get(), FSSupportedOps::kFSBuffer)) {
use_fs_scratch_ = true;
}
if (CheckFSFeatureSupport(ioptions_.fs.get(),
FSSupportedOps::kVerifyAndReconstructRead)) {
retry_corrupt_read_ = true;
}
}
IOStatus ReadBlockContents();
IOStatus ReadAsyncBlockContents();
inline CompressionType compression_type() const {
return decomp_args_.compression_type;
}
inline CompressionType& compression_type() {
return decomp_args_.compression_type;
}
inline size_t GetBlockSizeWithTrailer() const {
return block_size_with_trailer_;
}
inline Slice& GetCompressedBlock() {
assert(compression_type() != kNoCompression);
return slice_;
}
#ifndef NDEBUG
int TEST_GetNumStackBufMemcpy() const { return num_stack_buf_memcpy_; }
int TEST_GetNumHeapBufMemcpy() const { return num_heap_buf_memcpy_; }
int TEST_GetNumCompressedBufMemcpy() const {
return num_compressed_buf_memcpy_;
}
#endif
private:
#ifndef NDEBUG
int num_stack_buf_memcpy_ = 0;
int num_heap_buf_memcpy_ = 0;
int num_compressed_buf_memcpy_ = 0;
#endif
static const uint32_t kDefaultStackBufferSize = 5000;
RandomAccessFileReader* file_;
FilePrefetchBuffer* prefetch_buffer_;
const Footer& footer_;
const ReadOptions read_options_;
const BlockHandle& handle_;
BlockContents* contents_;
const ImmutableOptions& ioptions_;
const bool do_uncompress_;
const bool maybe_compressed_;
const BlockType block_type_;
const size_t block_size_;
const size_t block_size_with_trailer_;
UnownedPtr<Decompressor> decompressor_;
const PersistentCacheOptions& cache_options_;
MemoryAllocator* memory_allocator_;
MemoryAllocator* memory_allocator_compressed_;
IOStatus io_status_;
Slice slice_;
char* used_buf_ = nullptr;
AlignedBuf direct_io_buf_;
CacheAllocationPtr heap_buf_;
CacheAllocationPtr compressed_buf_;
char stack_buf_[kDefaultStackBufferSize];
bool got_from_prefetch_buffer_ = false;
bool for_compaction_ = false;
bool use_fs_scratch_ = false;
bool retry_corrupt_read_ = false;
FSAllocationPtr fs_buf_;
Decompressor::Args decomp_args_;
bool TryGetUncompressBlockFromPersistentCache();
bool TryGetFromPrefetchBuffer();
bool TryGetSerializedBlockFromPersistentCache();
void PrepareBufferForBlockFromFile();
void CopyBufferToHeapBuf();
void CopyBufferToCompressedBuf();
void GetBlockContents();
void InsertCompressedBlockToPersistentCacheIfNeeded();
void InsertUncompressedBlockToPersistentCacheIfNeeded();
void ProcessTrailerIfPresent();
void ReadBlock(bool retry);
void ReleaseFileSystemProvidedBuffer(FSReadRequest* read_req) {
if (use_fs_scratch_) {
if (read_req->fs_scratch != nullptr) {
read_req->fs_scratch.reset();
read_req->fs_scratch = nullptr;
}
}
}
};
}